I run Structured Streaming (2.1.0) on a 3 node yarn cluster and write json threads to the parquet. My code snippet is as follows:
val query = ds.writeStream
.format("parquet")
.option("checkpointLocation", "/data/kafka_streaming.checkpoint")
.start("/data/kafka_streaming.parquet")
I notice that it quickly creates thousands of small files in just 1000 entries. I suspect he should be doing a launch frequency. So I changed it:
val query = ds.writeStream
.format("parquet")
.option("checkpointLocation", "/data/kafka_streaming.checkpoint")
.**trigger(ProcessingTime("60 seconds"))**
.start("/data/kafka_streaming.parquet")
The difference is very obvious. Now I see a much smaller number of files created for the same number of records.
My question is: is there a way to have a low latency for a trigger and contain fewer large output files?
source
share