Requests with streaming sources must be performed using writeStream.start ();

I am trying to read messages from kafka (version 10) in spark and trying to print it.

import spark.implicits._ val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .config("spark.master", "local") .getOrCreate() val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicA") .load() ds1.collect.foreach(println) ds1.writeStream .format("console") .start() ds1.printSchema() 

I get an error exception in the stream "main"

org.apache.spark.sql.AnalysisException: requests with streaming sources must be executed with writeStream.start () ;;

+14
source share
5 answers

You fork the query plan: from the same ds1 that you are trying:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

But you only .start() on the second branch, leaving the other dangling without completion, which in turn raises the exception you get back.

The solution is to start both branches and wait for completion.

 val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicA") .load() val query1 = ds1.collect.foreach(println) .writeStream .format("console") .start() val query2 = ds1.writeStream .format("console") .start() ds1.printSchema() query1.awaitTermination() query2.awaitTermination() 
+14
source

I fixed the problem using the following code.

  val df = session .readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", "streamTest2") .load(); val query = df.writeStream .outputMode("append") .format("console") .start() query.awaitTermination() 
+5
source

I struggled a lot with this issue. I tried each of the suggested solutions from various blogs. But in my case, between calling start () on a request and several requests, finally, I called the awaitTerminate () function that calls it.

Please try this way, it works great for me. Working example:

 val query = df.writeStream .outputMode("append") .format("console") .start().awaitTermination(); 

If you write this way, it will throw an exception / error:

 val query = df.writeStream .outputMode("append") .format("console") .start() // some statement // some statement query.awaitTermination(); 

throws this exception and closes your streaming driver.

+4
source

while reading error message

org.apache.spark.sql.AnalysisException: requests with streaming sources must be executed with writeStream.start () ;;

I found this article that explains it well and gives a different approach. I will try it myself and post the results later if this works for me.

+2
source

Please remove ds1.collect.foreach(println) and ds1.printSchema() , use outputMode and awaitAnyTermination for the background process. Waiting for any of the queries in the associated spark.streams

 val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .config("spark.master", "local[*]") .getOrCreate() val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicA") .load() val consoleOutput1 = ds1.writeStream .outputMode("update") .format("console") .start() spark.streams.awaitAnyTermination() 

 |key|value|topic|partition|offset| +---+-----+-----+---------+------+ +---+-----+-----+---------+------+ 
+1
source

Source: https://habr.com/ru/post/1259767/


All Articles