Window Using Apache Beam - Fix Windows Problem That Does Not Close?

We are trying to use fixed windows in the Apache Beam pipeline (using DirectRunner). Our flow is as follows:

  • Extract data from pub / sub
  • Deaminate JSON into a Java Object
  • Window events with fixed windows 5 seconds
  • Using custom CombineFn, merge each window EventintoList<Event>
  • For testing, simply print the result List<Event>

Pipeline Code:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

The result that we see is that the last step never starts, since we are not receiving any protocols.

In addition, we have added a System.out.println("***")custom class to each method CombineFnto keep track of when they are executed, and it seems that they do not start either.

? , https://beam.apache.org/documentation/programming-guide/#windowing, , , , - .

- !

+4
1

, - , , . , ( ), :

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

, , 5 . , , . , withAllowedLateness - , .

, !

+4

Source: https://habr.com/ru/post/1677300/


All Articles