Why does the full withdrawal mode require aggregation?

I am working with the latest Structured Streaming in Apache Spark 2.2 and received the following exception:

org.apache.spark.sql.AnalysisException: full output mode is not supported when there are no stream aggregates when streaming DataFrames / Datasets ;;

Why does full output mode require streaming aggregation? What happens if Spark allows full output without aggregation in a streaming request?

scala> spark.version res0: String = 2.2.0 import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.SQLContext implicit val sqlContext: SQLContext = spark.sqlContext val source = MemoryStream[(Int, Int)] val ids = source.toDS.toDF("time", "id"). withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp dropDuplicates("id"). withColumn("time", $"time" cast "long") // <-- convert time column back from Timestamp to Int import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ scala> val q = ids. | writeStream. | format("memory"). | queryName("dups"). | outputMode(OutputMode.Complete). // <-- memory sink supports checkpointing for Complete output mode only | trigger(Trigger.ProcessingTime(30.seconds)). | option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts | start org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;; Project [cast(time#10 as bigint) AS time#15L, id#6] +- Deduplicate [id#6], true +- Project [cast(time#5 as timestamp) AS time#10, id#6] +- Project [_1#2 AS time#5, _2#3 AS id#6] +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, _2#3] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:115) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:247) ... 57 elided 
+5
source share
1 answer

From the Structured Streaming Programming Guide - other queries (excluding aggregations, mapGroupsWithState and flatMapGroupsWithState ):

Full mode is not supported because it is not possible to save all non-aggregated data in the result table.

To answer the question:

What happens if Spark allows full output without aggregation in a streaming request?

Maybe OOM.

The incomprehensible part is why dropDuplicates("id") not marked as aggregation.

+2
source

Source: https://habr.com/ru/post/1270996/


All Articles