Long state with Google Dataflow

Just try to come up with a programming model here. Scenario - I use Pub / Sub + Dataflow for tool analytics for a web forum. I have a stream of data coming from Pub / Sub that looks like this:

ID | TS | EventType 1 | 1 | Create 1 | 2 | Comment 2 | 2 | Create 1 | 4 | Comment 

And I want to end the stream coming from Dataflow, which looks like this:

 ID | TS | num_comments 1 | 1 | 0 1 | 2 | 1 2 | 2 | 0 1 | 4 | 2 

I want the task that this convolution runs as a thread, with new values ​​that are filled as new events arrive. My question is: where is the idiomatic place for the job to store the state for the current topic id and comment? Assuming threads can live for years. Current ideas:

  • Write the “current” entry for the topic ID in BigTable and in the DoFn request, which includes the current comment for the topic ID. Even when I write this, I am not a fan.
  • Use the side inputs in some way? This seems to be the answer, but if so, I don't quite understand.
  • Set up a streaming job with a global window with a trigger that turns off every time it receives a recording and relies on Dataflow to save the entire panel history. (unlimited storage requirement?)

EDIT: just to clarify, I would not have problems implementing any of these three strategies or millions of other other ways to do this, I'm more interested in how best to do this with Dataflow. What will be the most resistant to failure, you need to double-check the history for backfill, etc. Etc.

EDIT2: There is currently an error with the data flow service in which updates fail if you add attachments to the smoothing transformation, which will mean that you will need to drop and restore any state accrued to the task if you make changes to the task which involves adding something to the smoothing operation.

+5
source share
2 answers

You can use triggers and combines to accomplish this.

 PCollection<ID> comments = /* IDs from the source */; PCollection<KV<ID, Long>> commentCounts = comments // Produce speculative results by triggering as data comes in. // Note that this won't trigger after *every* element, but it will // trigger relatively quickly (as the system divides incoming data // into work units). You could also throttle this with something // like: // AfterProcessingTime.pastFirstElementInPane() // .plusDelayOf(Duration.standardMinutes(5)) // which will produce output every 5 minutes .apply(Window.triggering( Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) // Count the occurrences of each ID .apply(Count.perElement()); // Produce an output String -- in your use case you'd want to produce // a row and write it to the appropriate source commentCounts.apply(new DoFn<KV<ID, Long>, String>() { public void processElement(ProcessContext c) { KV<ID, Long> element = c.element(); // This includes details about the pane of the window being // processed, and including a strictly increasing index of the // number of panes that have been produced for the key. PaneInfo pane = c.pane(); return element.key() + " | " + pane.getIndex() + " | " + element.value(); } }); 

Depending on your data, you can also read whole source comments, extract the identifier, and then use Count.perKey() to get the counts for each identifier. If you need a more complex combination, you can look at the definition of a custom CombineFn and use Combine.perKey .

+7
source

Since BigQuery does not support line rewriting, one way to do this is to write events to BigQuery and query the data using COUNT

SELECT ID, COUNT (num_comments) from the GROUP BY ID table;

You can also aggregate each num_comments window in a data stream before writing records in BigQuery; The above request will continue to work.

+2
source

Source: https://habr.com/ru/post/1236090/


All Articles