I am working with the latest Structured Streaming in Apache Spark 2.2 and received the following exception:
org.apache.spark.sql.AnalysisException: full output mode is not supported when there are no stream aggregates when streaming DataFrames / Datasets ;;
Why does full output mode require streaming aggregation? What happens if Spark allows full output without aggregation in a streaming request?
scala> spark.version res0: String = 2.2.0 import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.SQLContext implicit val sqlContext: SQLContext = spark.sqlContext val source = MemoryStream[(Int, Int)] val ids = source.toDS.toDF("time", "id"). withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp dropDuplicates("id"). withColumn("time", $"time" cast "long") // <-- convert time column back from Timestamp to Int import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ scala> val q = ids. | writeStream. | format("memory"). | queryName("dups"). | outputMode(OutputMode.Complete). // <-- memory sink supports checkpointing for Complete output mode only | trigger(Trigger.ProcessingTime(30.seconds)). | option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts | start org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;; Project [cast(time
source share