This is a key use case for each key and state window and timers .
The status is described in a Beam blog post , while for timers you will have to rely on Javadoc. Never remember what javadoc says about the runners who support them, the true status is found in the Beam opportunity matrix .
The sample is very similar to what you wrote, but the state allows it to work with windows, as well as through bundles, since they can be very small in streaming. Since the state must be split somehow in order to support parallelism, you need to add some key. There is currently no automatic switch for this.
private static final class Function extends DoFn<KV<Key, String>, Void> implements Serializable { private static final long serialVersionUID = 2417984990958377700L; private static final int LIMIT = 500; @StateId("bufferedSize") private final StateSpec<Object, ValueState<Integer>> bufferedSizeSpec = StateSpecs.value(VarIntCoder.of()); @StateId("buffered") private final StateSpec<Object, BagState<String>> bufferedSpec = StateSpecs.bag(StringUtf8Coder.of()); @TimerId("expiry") private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); @ProcessElement public void processElement( ProcessContext context, BoundedWindow window, @StateId("bufferedSize") ValueState<Integer> bufferedSizeState, @StateId("buffered") BagState<String> bufferedState, @TimerId("expiry") Timer expiryTimer) { int size = firstNonNull(bufferedSizeState.read(), 0); bufferedState.add(context.element().getValue()); size += 1; bufferedSizeState.write(size); expiryTimer.set(w.maxTimestamp().plus(allowedLateness)); if (size > LIMIT) { flush(context, bufferedState, bufferedSizeState); } } @OnTimer("expiry") public void onExpiry( OnTimerContext context, @StateId("bufferedSize") ValueState<Integer> bufferedSizeState, @StateId("buffered") BagState<String> bufferedState) { flush(context, bufferedState, bufferedSizeState); } private void flush( Context context, BagState<String> bufferedState, ValueState<Integer> bufferedSizeState) { Iterable<String> buffered = bufferedState.read();
Taking a few notes here:
- State replaces your
DoFn instance DoFn , because instance variables do not have a single link between windows. - The buffer and size are simply initialized as needed by
@StartBundle . BagState supports blind writing, so there is no need for any read-modify-write, just fixing the new elements in the same way as when exiting.- Setting the timer at the same time is fine; it should be basically noop.
@OnTimer("expiry") is replaced by @FinishBundle , since the trim of the beam is not a thing for the window, but an artifact like a runner executes your pipeline.
All that said, if you are writing an external system, you may need to re-dock the windows and re-window in the global window before you record, where the way you record depends on the window, because "the outside world is globally over."
source share