I am trying to transfer some data from Google PubSub to BigQuery using python data stream. For testing purposes, I adapted the following code https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py in the stream pipeline by setting
options.view_as(StandardOptions).streaming = True
So, I changed the record_ids pipeline to read from Pub / Sub
# ADDED THIS lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15)) # CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode)) records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) records | 'Write' >> beam.io.Write( beam.io.BigQuerySink( OUTPUT, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
Note. I was google whitelisted to run code (in alpha)
Now when I try, I have a mistake
Failed to complete workflow. Reasons: (f215df7c8fcdbb00): Unknown stream sink: bigquery
Here you can find the full code: https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py
I think this is due to the fact that the pipeline is now a streaming stream, can someone tell me how to write BigQuery in a stream pipeline?
Beam Python does not support writing to BigQuery from stream pipelines. At this point, you need to use Beam Java - you can use PubsubIO.readStrings() and BigQueryIO.writeTableRows() respectively.
PubsubIO.readStrings()
BigQueryIO.writeTableRows()
Source: https://habr.com/ru/post/1271579/More articles:Error MT1108: Could not find tools for this device - iosReact PropTypes - How to make a form optional with its fields? - reactjsHow to activate VueJS router-link style - vue.jsHow to use third-party .flow files? - javascript"package javax.xml.soap is declared in the module java.xml.ws, which is not in the module graph" - javaXamarin.iOS / Akavache - Encrypted cache with custom EncryptionProvider - iosXamarin.iOS / Akavache working example - ioshttps://translate.googleusercontent.com/translate_c?depth=1&rurl=translate.google.com&sl=ru&sp=nmt4&tl=en&u=https://fooobar.com/questions/1271582/single-line-from-file-is-too-big&usg=ALkJrhi358mKcnfnwqET2UOokAJBI0pwHwWindows and Anonymous Authentication in .Net Core 2.0 - authenticationNTLM Authentication on a Specific Route in ASP.NET Kernel - authenticationAll Articles