Reading from PubsubIO in DatastoreIO

Is it possible to create a pipeline that reads data from Pub / Sub and writes to Datastore? In my code, I specify PubsubIO as an input and use windowing to get a limited PCollection, but it seems impossible to use DatastoreIO.writeTo with .setStreaming parameters as true, while it is required to use PubsubIO as an input. Is there any way around this? Or is it simply impossible to read from pubsub and write to the data warehouse?

Here is my code:

DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(DataflowPipelineRunner.class); options.setProject(projectName); options.setStagingLocation("gs://my-staging-bucket/staging"); options.setStreaming(true); Pipeline p = Pipeline.create(options); PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming")); PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1))); PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() { private static final long serialVersionUID = 1L; public void processElement(ProcessContext c) { String msg = c.element(); byte[] decoded = Base64.decodeBase64(msg.getBytes()); String outmsg = new String(decoded); c.output(outmsg); } })); PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events"))); inputEntity.apply(DatastoreIO.writeTo(datasetid)); p.run(); 

And this exception I get:

 Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner. at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480) at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159) at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104) 
+5
source share
2 answers

The streaming player lacks DatastoreIO support. To write to the data warehouse from the stream pipeline, you can make direct calls to the Datastore API from DoFn.

+5
source

Well, after I hit my head hard against the wall, I finally started working. Like Daniel, I call the Datastore API from ParDo DoFn. One of the problems was that I did not realize that there was a separate API for using Cloud Datastore outside of AppEngine. (com.google.api.services.datastore ... vs. com.google.appengine.api.datastore ...). Another problem was that there seems to be some error in the latest version of the cloud storage API (google-api-services-datastore-protobuf v1beta2-rev1-4.0.0, I got IllegalAccessError), I decided that with using the older version (v1beta2-rev1-2.1.2).

So here is my working code:

 import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.api.services.datastore.DatastoreV1.*; import com.google.api.services.datastore.client.Datastore; import com.google.api.services.datastore.client.DatastoreException; import com.google.api.services.datastore.client.DatastoreFactory; import static com.google.api.services.datastore.client.DatastoreHelper.*; import java.security.GeneralSecurityException; import java.io.IOException; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; //-------------------- public static void main(String[] args) { DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(DataflowPipelineRunner.class); options.setProject(projectName); options.setStagingLocation("gs://my-staging-bucket/staging"); options.setStreaming(true); Pipeline p = Pipeline.create(options); PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name")); input.apply(ParDo.of(new DoFn<String, String>() { private static final long serialVersionUID = 1L; public void processElement(ProcessContext c) throws ParseException, DatastoreException { JSONObject json = (JSONObject)new JSONParser().parse(c.element()); Datastore datastore = null; try { datastore = DatastoreFactory.get().create(getOptionsFromEnv() .dataset(datasetid).build()); } catch (GeneralSecurityException exception) { System.err.println("Security error connecting to the datastore: " + exception.getMessage()); } catch (IOException exception) { System.err.println("I/O error connecting to the datastore: " + exception.getMessage()); } Key.Builder keyBuilder = makeKey("my-kind"); keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace"); Entity.Builder event = Entity.newBuilder() .setKey(keyBuilder); event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop")))); CommitRequest commitRequest = CommitRequest.newBuilder() .setMode(CommitRequest.Mode.NON_TRANSACTIONAL) .setMutation(Mutation.newBuilder().addInsertAutoId(event)) .build(); if(datastore!=null){ datastore.commit(commitRequest); } } })); p.run(); } 

And the dependencies in pom.xml:

 <dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> <version>[1.0.0,2.0.0)</version> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-datastore-protobuf</artifactId> <version>v1beta2-rev1-2.1.2</version> </dependency> <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client</artifactId> <version>1.17.0-rc</version> </dependency> <!-- Some more.. like JUnit etc.. --> 
+4
source

Source: https://habr.com/ru/post/1240620/


All Articles