1. 예제

Spark

Spark - Scala 실행 예제

실행예제 : http://spark.apache.org/examples.html

Word Count

val textFile = sc.textFile("./README.md")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("./output_scala")
val data = Array(1,2,3,4,5)
val result = data.map((a)=>(a+1))

val textFile = sc.textFile("./2M.ID.CONTENTS") val counts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey( + ) counts.saveAsTextFile("./output_wordcount_scala")

Spark Session (DataFrame)

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

JSON 데이터 저장

$ cat sample.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

val df = spark.read.json("sample.json")
scala> df.show()

출력 +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+

select

df.select($"name", $"age" + 1).show()

// show 함수 선언 def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))

// 사용방법 scala> show(10, false) scala> show(100, true)

// 필터링 처리 df.filter($"age" > 21).show()

// 그룹핑 처리 scala> df.groupBy("age").count().show()

// DS peopleDS.filter("age is not null").map(p => p.name + ", " + p.age).show()

df.createOrReplaceTempView("cdump")

spark.sql("select count() from cdump").show() spark.sql("select from cdump limit 10").show() spark.sql("select count(distinct authors) from cdump limit 10").show() spark.sql("select distinct authors,genres,title, type,isbn_10, number_of_pages, languages from cdump order by title desc limit 10").show()

SparkStreaming

import org.apache.spark. import org.apache.spark.streaming.

val ssc = new StreamingContext(sc, Seconds(5)) // 5초간격 배치 처리 val lines = ssc.socketTextStream("127.0.0.1", 9999) // 텍스트를 입력 받을 IP, port 입력 val words = lines.flatMap(.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print()

ssc.start() // 데이터 받기 시작 ssc.awaitTermination() // 별도의 스레드로 작업 시작

Last updated