Well, after I hit my head hard against the wall, I finally started working. Like Daniel, I call the Datastore API from ParDo DoFn. One of the problems was that I did not realize that there was a separate API for using Cloud Datastore outside of AppEngine. (com.google.api.services.datastore ... vs. com.google.appengine.api.datastore ...). Another problem was that there seems to be some error in the latest version of the cloud storage API (google-api-services-datastore-protobuf v1beta2-rev1-4.0.0, I got IllegalAccessError), I decided that with using the older version (v1beta2-rev1-2.1.2).
So here is my working code:
import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.api.services.datastore.DatastoreV1.*; import com.google.api.services.datastore.client.Datastore; import com.google.api.services.datastore.client.DatastoreException; import com.google.api.services.datastore.client.DatastoreFactory; import static com.google.api.services.datastore.client.DatastoreHelper.*; import java.security.GeneralSecurityException; import java.io.IOException; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; //-------------------- public static void main(String[] args) { DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); options.setRunner(DataflowPipelineRunner.class); options.setProject(projectName); options.setStagingLocation("gs://my-staging-bucket/staging"); options.setStreaming(true); Pipeline p = Pipeline.create(options); PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/my-topic-name")); input.apply(ParDo.of(new DoFn<String, String>() { private static final long serialVersionUID = 1L; public void processElement(ProcessContext c) throws ParseException, DatastoreException { JSONObject json = (JSONObject)new JSONParser().parse(c.element()); Datastore datastore = null; try { datastore = DatastoreFactory.get().create(getOptionsFromEnv() .dataset(datasetid).build()); } catch (GeneralSecurityException exception) { System.err.println("Security error connecting to the datastore: " + exception.getMessage()); } catch (IOException exception) { System.err.println("I/O error connecting to the datastore: " + exception.getMessage()); } Key.Builder keyBuilder = makeKey("my-kind"); keyBuilder.getPartitionIdBuilder().setNamespace("my-namespace"); Entity.Builder event = Entity.newBuilder() .setKey(keyBuilder); event.addProperty(makeProperty("my-prop",makeValue((String)json.get("my-prop")))); CommitRequest commitRequest = CommitRequest.newBuilder() .setMode(CommitRequest.Mode.NON_TRANSACTIONAL) .setMutation(Mutation.newBuilder().addInsertAutoId(event)) .build(); if(datastore!=null){ datastore.commit(commitRequest); } } })); p.run(); }
And the dependencies in pom.xml:
<dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> <version>[1.0.0,2.0.0)</version> </dependency> <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-datastore-protobuf</artifactId> <version>v1beta2-rev1-2.1.2</version> </dependency> <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client</artifactId> <version>1.17.0-rc</version> </dependency>
source share