Detecting key state changes

I am new to the Dataflow programming model and am experiencing problems with what, in my opinion, should be a simple use case:

I have a pipeline that reads current data from Pub / Sub, this data contains device states with (simplified) serial number and state (UP or DOWN). The device is guaranteed to send its status at least every 5 minutes, but, of course, the device can send the same state several times.

What I'm trying to achieve is a pipeline that only emits state changes for the device, so basically it tracks some concept of β€œlast state per key” for a given key and compares new events with that.

Is there a good way to do this at the moment?

+1
source share
1 answer

There is a related question in β€œ Delete duplicates between triggers / window firing],” but your question raises some subtleties that vary. Therefore, let me consider two aspects separately and pass on some parts of the related question.

1. Taking the last input value

Here your question is different in that it clearly does not output the result of an associative and commutative operation Combine. This is important because the input is not streamlined in Dataflow and Beam - it just contains timestamps so we can reason about it during the event.

(timestamp, UP/DOWN) , , . , , .

, , GroupByKey ( ). , , .

, " ".

2.

. , , . .

, , , (timestamp, CHANGE/NO_CHANGE, UP/DOWN), NO_CHANGE CHANGE. , NO_CHANGE. UP DOWN .

" , ", , , Apache Beam, Dataflow 2.x.

DoFn :

new DoFn<KV<DeviceId, UpDown>, KV<DeviceId, UpDown>>() {

  @StateId("latestTimestamp")
  private static final StateSpec<Object, ValueState<Instant>> latestTimestampSpec =
      StateSpecs.value(InstantCoder.of());

  @StateId("latestOutput")
  private static final StateSpec<Object, ValueState<UpDown>> latestOutputSpec =
      StateSpecs.value(UpDown.getCoder());

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("latestTimestamp") latestTimestampState,
      @StateId("latestOutput") latestOutputState) {

    Instant latestTimestamp = latestTimestampState.read();
    UpDown latestOutput = latestOutputState.read();
    Instant newTimestamp = c.element().timestamp();
    UpDown newValue = c.element().getValue();

    if (newTimestamp.isAfter(latestTimestamp)
        && !newValue.equals(latestOutput)) {
      c.output(KV.of(c.element().getKey(), newValue));
      latestTimestampState.write(newTimestamp);
      latestOutputState.write(newValue);
    }
  }
}

- , Beam. , .

+2

Source: https://habr.com/ru/post/1648604/


All Articles