I have a job in Dataflow importing data from Bigtable into Bigquery using the built-in Dataflow APIs for both. I have two questions:
Question 1: If the source data is in one big table in Bigtable, how can I split it into a set of sub-smaller tables in BigQuery dynamically, based, say, on this Bigtable, the row key is known only at runtime?
The Java code in Dataflow is as follows:
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(ParDo.of(new SomeDoFNonBTSourceData()))
.apply(BigQueryIO.Write
.to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
So, since it BQ_TableNameshould be provided at the code level, how can I programmatically create it based on what is visible inside SomeDoFNonBTSourceData, like a range of values for the current RowKey? If RowKey is "ac", then TableA, if "df", then TableB, etc.
Question 2: What is the correct way to export Bigtable Timestamp to Bigquery in order to eventually restore it in a readable format in BigQuery?
The processElement function inside DoFn is as follows:
public void processElement(ProcessContext c)
{
String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray());
Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp();
tr.put("ColA", valA);
tr.put("TimeStamp",timeStamp);
c.output(tr);
}
And while building the pipeline, setting up the BQ scheme for the timeStamp column is as follows:
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ColA").setType("STRING"));
fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP"));
schema = new TableSchema().setFields(fields);
, Bigtable, , Long, "TIMESTAMP" "INTEGER" TimeStamp BQ (, Long BQ ). , TimeStamp BQ " ", ( ). " ", , , CAST - - , - .