I am trying to run SQL queries on streaming data in spark mode. It looks pretty straight forward, but when I try it, I get an error : tablename> . He could not find the table that I registered.
Using Spark SQL with batch data works great, so I think this is due to the way I call streamingcontext.start (). Any ideas what the problem is? Here is the code:
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql.SQLContext object Streaming { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount") val sc = new SparkContext(sparkConf) // Create the context val ssc = new StreamingContext(sc, Seconds(2)) val sqc = new SQLContext(sc); import sqc.createSchemaRDD // Create the FileInputDStream on the directory and use the // stream to count words in new files created val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people.txt") lines.foreachRDD(rdd=>rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")) // lines.foreachRDD(rdd=>rdd.foreach(println)) val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19") ssc.start() ssc.awaitTermination() } }
Any suggestions are welcome. Thanks.
source share