Bigtable-BigQuery Import via DataFlow: 2 questions on splitting tables and timestamps

I have a job in Dataflow importing data from Bigtable into Bigquery using the built-in Dataflow APIs for both. I have two questions:

Question 1: If the source data is in one big table in Bigtable, how can I split it into a set of sub-smaller tables in BigQuery dynamically, based, say, on this Bigtable, the row key is known only at runtime?

The Java code in Dataflow is as follows:

p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(ParDo.of(new SomeDoFNonBTSourceData()))
        .apply(BigQueryIO.Write
                  .to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName)
                  .withSchema(schema)
                  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
        p.run();

So, since it BQ_TableNameshould be provided at the code level, how can I programmatically create it based on what is visible inside SomeDoFNonBTSourceData, like a range of values ​​for the current RowKey? If RowKey is "ac", then TableA, if "df", then TableB, etc.

Question 2: What is the correct way to export Bigtable Timestamp to Bigquery in order to eventually restore it in a readable format in BigQuery?

The processElement function inside DoFn is as follows:

public void processElement(ProcessContext c)
{
    String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray());
    Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp();

    tr.put("ColA", valA);
    tr.put("TimeStamp",timeStamp);
    c.output(tr);
}

And while building the pipeline, setting up the BQ scheme for the timeStamp column is as follows:

List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("ColA").setType("STRING"));
    fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP"));
    schema = new TableSchema().setFields(fields);

, Bigtable, , Long, "TIMESTAMP" "INTEGER" TimeStamp BQ (, Long BQ ). , TimeStamp BQ " ", ( ). " ", , , CAST - - , - .

+1
1

, , 1:).

, , , Long - UNIX, , BQ - .

...

Long longTimeStamp = 1408452095L;

Date timeStamp = new Date();
timeStamp.setTime(longTimeStamp * 1000);

tr.put("TimeStamp", timeStamp.toInstant().toString());
+2

Source: https://habr.com/ru/post/1667684/


All Articles