Structured Streaming 2.1.0 stream to Parquet creates many small files

I run Structured Streaming (2.1.0) on a 3 node yarn cluster and write json threads to the parquet. My code snippet is as follows:

  val query = ds.writeStream
        .format("parquet")
        .option("checkpointLocation", "/data/kafka_streaming.checkpoint")
        .start("/data/kafka_streaming.parquet")

I notice that it quickly creates thousands of small files in just 1000 entries. I suspect he should be doing a launch frequency. So I changed it:

  val query = ds.writeStream
        .format("parquet")
        .option("checkpointLocation", "/data/kafka_streaming.checkpoint")
        .**trigger(ProcessingTime("60 seconds"))**
        .start("/data/kafka_streaming.parquet")

The difference is very obvious. Now I see a much smaller number of files created for the same number of records.

My question is: is there a way to have a low latency for a trigger and contain fewer large output files?

+4
source share

Source: https://habr.com/ru/post/1673405/


All Articles