Change the preprocessing pipeline to use BigQuerySource(use the same class Featuresas in the CSV samples). Here is an example:
feature_set = CsvFeatures()
train_query = "SELECT …"
valid_query = "SELECt …"
train = pipeline | 'read_train' >> beam.Read(beam.io.BigQuerySource(query=train_query))
eval = pipeline | 'read_valid' >> beam.Read(beam.io.BigQuerySource(query=valid_query))
(metadata, train_features, eval_features) = ((train, eval) |
ml.Preprocess('Preprocess', feature_set))
source
share