This is more like this question .
I am creating a pipeline in Dataflow 2.x that accepts streaming input from a Pubsub queue. Each individual message that is required must be transmitted through a very large dataset that comes from Google BigQuery and has all the corresponding values attached to it (based on the key) before being written to the database.
The problem is that the collation dataset from BigQuery is very large - any attempt to use it as a side input failed when the Dataflow runners throw the error "java.lang.IllegalArgumentException: ByteString will be too long". I tried to execute the following strategies:
1) Side entry
- As indicated, the matching data is (apparently) too large for this. If I am wrong here, or there is a workaround for this, let me know because this will be the easiest solution.
2) Display of a pair of keys and values
- In this strategy, I read the BigQuery and Pubsub message data in the first part of the pipeline, then run each through ParDo transforms that change each value in the PCollections for KeyValue pairs. Then I run the Merge.Flatten transform and the GroupByKey transform to append the matching mapping data to each message.
- The problem is that streaming requires windows to be merged with other data, so I have to use the window for large, limited BigQuery data. It also requires that windowing strategies be the same on both datasets. But no window strategy for limited data makes sense, and the few window attempts I made just sent all the BQ data in one window and then never sent it again. It must be connected to every incoming pubsub message.
3) Call BQ directly in ParDo (DoFn)
- It seemed like a good idea - every worker announces a static instance of map data. If this is not the case, then call BigQuery directly to get it. Unfortunately, this causes internal errors from BigQuery every time (as in the whole message, it simply says "Internal error"). Filing an application for support with Google led them to tell me that, in essence, "you cannot do this."
It seems that this task is really not suitable for a model with inconvenient parallelization, so I bark the wrong tree here?
EDIT:
Even when using a machine with high memory in the data stream and trying to make side entry as a map, I get the error java.lang.IllegalArgumentException: ByteString would be too long
Here is an example (psuedo) of the code I use:
Pipeline pipeline = Pipeline.create(options); PCollectionView<Map<String, TableRow>> mapData = pipeline .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql()) .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) .apply(View.asMap()); PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages() .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription))); messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() { @ProcessElement public void processElement(ProcessContext c) { JSONObject data = new JSONObject(new String(c.element().getPayload())); String key = getKeyFromData(data); TableRow sideInputData = c.sideInput(mapData).get(key); if (sideInputData != null) { LOG.info("holyWowItWOrked"); c.output(new TableRow()); } else { LOG.info("noSideInputDataHere"); } } }).withSideInputs(mapData));
The pipeline throws an exception and crashes before writing anything from ParDo .
Stack trace:
java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497 com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559) com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575) com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320) com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951) com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216) com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133) com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
source share