There is a related question in β Delete duplicates between triggers / window firing],β but your question raises some subtleties that vary. Therefore, let me consider two aspects separately and pass on some parts of the related question.
1. Taking the last input value
Here your question is different in that it clearly does not output the result of an associative and commutative operation Combine. This is important because the input is not streamlined in Dataflow and Beam - it just contains timestamps so we can reason about it during the event.
(timestamp, UP/DOWN) , , . , , .
, , GroupByKey ( ). , , .
, " ".
2.
. , , . .
, , , (timestamp, CHANGE/NO_CHANGE, UP/DOWN), NO_CHANGE CHANGE. , NO_CHANGE. UP DOWN .
" , ", , , Apache Beam, Dataflow 2.x.
DoFn :
new DoFn<KV<DeviceId, UpDown>, KV<DeviceId, UpDown>>() {
@StateId("latestTimestamp")
private static final StateSpec<Object, ValueState<Instant>> latestTimestampSpec =
StateSpecs.value(InstantCoder.of());
@StateId("latestOutput")
private static final StateSpec<Object, ValueState<UpDown>> latestOutputSpec =
StateSpecs.value(UpDown.getCoder());
@ProcessElement
public void processElement(
ProcessContext c,
@StateId("latestTimestamp") latestTimestampState,
@StateId("latestOutput") latestOutputState) {
Instant latestTimestamp = latestTimestampState.read();
UpDown latestOutput = latestOutputState.read();
Instant newTimestamp = c.element().timestamp();
UpDown newValue = c.element().getValue();
if (newTimestamp.isAfter(latestTimestamp)
&& !newValue.equals(latestOutput)) {
c.output(KV.of(c.element().getKey(), newValue));
latestTimestampState.write(newTimestamp);
latestOutputState.write(newValue);
}
}
}
- , Beam. , .