Trying to run SparkSQL on top of Spark Streaming

I am trying to run SQL queries on streaming data in spark mode. It looks pretty straight forward, but when I try it, I get an error : tablename> . He could not find the table that I registered.

Using Spark SQL with batch data works great, so I think this is due to the way I call streamingcontext.start (). Any ideas what the problem is? Here is the code:

import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql.SQLContext object Streaming { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount") val sc = new SparkContext(sparkConf) // Create the context val ssc = new StreamingContext(sc, Seconds(2)) val sqc = new SQLContext(sc); import sqc.createSchemaRDD // Create the FileInputDStream on the directory and use the // stream to count words in new files created val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt") lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")) // lines.foreachRDD(rdd=>rdd.foreach(println)) val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19") ssc.start() ssc.awaitTermination() } } 

Any suggestions are welcome. Thanks.

+5
source share
1 answer

Well, I found out about the problem. You must query the data within the foreachRDD function, otherwise the table will not be recognized. Something like this works:

 import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.Duration object Mlist { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount") val sc = new SparkContext(sparkConf) // Create the context val ssc = new StreamingContext(sc, Seconds(2)) val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt") lines.foreachRDD(rdd=>rdd.foreach(println)) val sqc = new SQLContext(sc); import sqc.createSchemaRDD // Create the FileInputDStream on the directory and use the // stream to count words in new files created lines.foreachRDD(rdd=>{ rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data") val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19") teenagers.foreach(println) }) ssc.start() ssc.awaitTermination() } } 
+7
source

Source: https://habr.com/ru/post/1200805/


All Articles