1. 예제
Spark
Spark - Scala 실행 예제
실행예제 : http://spark.apache.org/examples.html
Word Count
val textFile = sc.textFile("./2M.ID.CONTENTS") val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey( + ) counts.saveAsTextFile("./output_wordcount_scala")
Spark Session (DataFrame)
JSON 데이터 저장
출력 +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
select
df.select($"name", $"age" + 1).show()
// show 함수 선언 def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// 사용방법 scala> show(10, false) scala> show(100, true)
// 필터링 처리 df.filter($"age" > 21).show()
// 그룹핑 처리 scala> df.groupBy("age").count().show()
// DS peopleDS.filter("age is not null").map(p => p.name + ", " + p.age).show()
df.createOrReplaceTempView("cdump")
spark.sql("select count() from cdump").show() spark.sql("select from cdump limit 10").show() spark.sql("select count(distinct authors) from cdump limit 10").show() spark.sql("select distinct authors,genres,title, type,isbn_10, number_of_pages, languages from cdump order by title desc limit 10").show()
SparkStreaming
import org.apache.spark. import org.apache.spark.streaming.
val ssc = new StreamingContext(sc, Seconds(5)) // 5초간격 배치 처리 val lines = ssc.socketTextStream("127.0.0.1", 9999) // 텍스트를 입력 받을 IP, port 입력 val words = lines.flatMap(.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print()
ssc.start() // 데이터 받기 시작 ssc.awaitTermination() // 별도의 스레드로 작업 시작
Last updated
Was this helpful?