Apache path in the data stream from the big side

This is more like this question .

I am creating a pipeline in Dataflow 2.x that accepts streaming input from a Pubsub queue. Each individual message that is required must be transmitted through a very large dataset that comes from Google BigQuery and has all the corresponding values ​​attached to it (based on the key) before being written to the database.

The problem is that the collation dataset from BigQuery is very large - any attempt to use it as a side input failed when the Dataflow runners throw the error "java.lang.IllegalArgumentException: ByteString will be too long". I tried to execute the following strategies:

1) Side entry

  • As indicated, the matching data is (apparently) too large for this. If I am wrong here, or there is a workaround for this, let me know because this will be the easiest solution.

2) Display of a pair of keys and values

  • In this strategy, I read the BigQuery and Pubsub message data in the first part of the pipeline, then run each through ParDo transforms that change each value in the PCollections for KeyValue pairs. Then I run the Merge.Flatten transform and the GroupByKey transform to append the matching mapping data to each message.
  • The problem is that streaming requires windows to be merged with other data, so I have to use the window for large, limited BigQuery data. It also requires that windowing strategies be the same on both datasets. But no window strategy for limited data makes sense, and the few window attempts I made just sent all the BQ data in one window and then never sent it again. It must be connected to every incoming pubsub message.

3) Call BQ directly in ParDo (DoFn)

  • It seemed like a good idea - every worker announces a static instance of map data. If this is not the case, then call BigQuery directly to get it. Unfortunately, this causes internal errors from BigQuery every time (as in the whole message, it simply says "Internal error"). Filing an application for support with Google led them to tell me that, in essence, "you cannot do this."

It seems that this task is really not suitable for a model with inconvenient parallelization, so I bark the wrong tree here?

EDIT:

Even when using a machine with high memory in the data stream and trying to make side entry as a map, I get the error java.lang.IllegalArgumentException: ByteString would be too long

Here is an example (psuedo) of the code I use:

  Pipeline pipeline = Pipeline.create(options); PCollectionView<Map<String, TableRow>> mapData = pipeline .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql()) .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) .apply(View.asMap()); PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages() .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription))); messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() { @ProcessElement public void processElement(ProcessContext c) { JSONObject data = new JSONObject(new String(c.element().getPayload())); String key = getKeyFromData(data); TableRow sideInputData = c.sideInput(mapData).get(key); if (sideInputData != null) { LOG.info("holyWowItWOrked"); c.output(new TableRow()); } else { LOG.info("noSideInputDataHere"); } } }).withSideInputs(mapData)); 

The pipeline throws an exception and crashes before writing anything from ParDo .

Stack trace:

 java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497 com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575) com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951) com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216) com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133) com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 
+8
source share
1 answer

Check out the “Template: large lookup tables in streaming mode” section of this article https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part -2 (this may be the only viable solution, since your side input does not fit in memory):

Description:

A large (in GB) lookup table should be accurate and often change or not fit into memory.

Example:

You have seller information about the point of sale, and you need to associate the product name with a data record that contains the product identifier. There are hundreds of thousands of items stored in an external database, which can be constantly changing. Also, all elements must be processed using the correct value.

Solution:

Use the External Services Calling Template to enrich the data . "But instead of calling the micro-service, call the NoSQL-optimized database (such as Cloud Datastore or Cloud Bigtable) directly.

For each value you want to find, create a Key Value pair using the KV utility class. Make GroupByKey to create packages with the same key type to make a call against the database. In DoFn, call the database for this key, and then apply the value to all the values ​​while walking multiple times. Follow best practices with the client for instantiating, as described in the "Calling External Services for Data" section. enrichment".

Other relevant templates are described in this article: https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 :

  • Pattern: slowly changing search cache
  • Pattern: calling external services to enrich data
+4
source

Source: https://habr.com/ru/post/1273692/


All Articles