Apache Beam Buffer and Data Streams

I have a streaming task that will process a large amount of data the first time it starts. One of DoFn calls a remote service that supports batch requests, so when working with limited collections, I use the following approach:

private static final class Function extends DoFn<String, Void> implements Serializable { private static final long serialVersionUID = 2417984990958377700L; private static final int LIMIT = 500; private transient Queue<String> buffered; @StartBundle public void startBundle(Context context) throws Exception { buffered = new LinkedList<>(); } @ProcessElement public void processElement(ProcessContext context) throws Exception { buffered.add(context.element()); if (buffered.size() > LIMIT) { flush(); } } @FinishBundle public void finishBundle(Context c) throws Exception { // process remaining flush(); } private void flush() { // build batch request while (!buffered.isEmpty()) { buffered.poll(); // do something } } } 

Is there a way to finish the data, so the same approach can be used for unlimited collections?

I tried the following:

 pipeline .apply("Read", Read.from(source)) .apply(WithTimestamps.of(input -> Instant.now())) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2L)))) .apply("Process", ParDo.of(new Function())); 

but startBundle and finishBundle are called for each item. Is there a chance to have something like with RxJava (2 minute windows or 100 elements):

 source .toFlowable(BackpressureStrategy.LATEST) .buffer(2, TimeUnit.MINUTES, 100) 
+5
source share
2 answers

This is a key use case for each key and state window and timers .

The status is described in a Beam blog post , while for timers you will have to rely on Javadoc. Never remember what javadoc says about the runners who support them, the true status is found in the Beam opportunity matrix .

The sample is very similar to what you wrote, but the state allows it to work with windows, as well as through bundles, since they can be very small in streaming. Since the state must be split somehow in order to support parallelism, you need to add some key. There is currently no automatic switch for this.

 private static final class Function extends DoFn<KV<Key, String>, Void> implements Serializable { private static final long serialVersionUID = 2417984990958377700L; private static final int LIMIT = 500; @StateId("bufferedSize") private final StateSpec<Object, ValueState<Integer>> bufferedSizeSpec = StateSpecs.value(VarIntCoder.of()); @StateId("buffered") private final StateSpec<Object, BagState<String>> bufferedSpec = StateSpecs.bag(StringUtf8Coder.of()); @TimerId("expiry") private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); @ProcessElement public void processElement( ProcessContext context, BoundedWindow window, @StateId("bufferedSize") ValueState<Integer> bufferedSizeState, @StateId("buffered") BagState<String> bufferedState, @TimerId("expiry") Timer expiryTimer) { int size = firstNonNull(bufferedSizeState.read(), 0); bufferedState.add(context.element().getValue()); size += 1; bufferedSizeState.write(size); expiryTimer.set(w.maxTimestamp().plus(allowedLateness)); if (size > LIMIT) { flush(context, bufferedState, bufferedSizeState); } } @OnTimer("expiry") public void onExpiry( OnTimerContext context, @StateId("bufferedSize") ValueState<Integer> bufferedSizeState, @StateId("buffered") BagState<String> bufferedState) { flush(context, bufferedState, bufferedSizeState); } private void flush( Context context, BagState<String> bufferedState, ValueState<Integer> bufferedSizeState) { Iterable<String> buffered = bufferedState.read(); // build batch request from buffered ... // clear things bufferedState.clear(); bufferedSizeState.clear(); } } 

Taking a few notes here:

  • State replaces your DoFn instance DoFn , because instance variables do not have a single link between windows.
  • The buffer and size are simply initialized as needed by @StartBundle .
  • BagState supports blind writing, so there is no need for any read-modify-write, just fixing the new elements in the same way as when exiting.
  • Setting the timer at the same time is fine; it should be basically noop.
  • @OnTimer("expiry") is replaced by @FinishBundle , since the trim of the beam is not a thing for the window, but an artifact like a runner executes your pipeline.

All that said, if you are writing an external system, you may need to re-dock the windows and re-window in the global window before you record, where the way you record depends on the window, because "the outside world is globally over."

+3
source

The documentation for apache beam 0.6.0 says that StateId "is not currently supported by any runner."

0
source

Source: https://habr.com/ru/post/1265714/


All Articles