// DS peopleDS.filter("age is not null").map(p => p.name + ", " + p.age).show()
df.createOrReplaceTempView("cdump")
spark.sql("select count() from cdump").show() spark.sql("select from cdump limit 10").show() spark.sql("select count(distinct authors) from cdump limit 10").show() spark.sql("select distinct authors,genres,title, type,isbn_10, number_of_pages, languages from cdump order by title desc limit 10").show()
val ssc = new StreamingContext(sc, Seconds(5)) // 5초간격 배치 처리 val lines = ssc.socketTextStream("127.0.0.1", 9999) // 텍스트를 입력 받을 IP, port 입력 val words = lines.flatMap(.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print()
ssc.start() // 데이터 받기 시작 ssc.awaitTermination() // 별도의 스레드로 작업 시작