How to make a Cartesian product of two PCollections in a data stream?

I would like to make a Cartesian product from two PCollections. Neither PCollection can fit in memory, so side entry is not possible.

My goal is this: I have two data sets. One of them is a lot of small items. The other is a bit (~ 10) very large. I would like to take a product of these two elements, and then create objects with a key.

+2
source share
1 answer

I think CoGroupByKey might work in your situation:

https://cloud.google.com/dataflow/model/group-by-key#join

What have I done for a similar use case. Although mine was probably not limited by memory (have you tried a larger cluster with larger machines?):

PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified .apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow())); PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p [...] .apply(GroupByKey.create()); 

Thus, collections are entered with the same key.

Then I declared the tags:

  final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>(); final TupleTag<TableRow> actualsTag = new TupleTag<>(); 

Combine them:

  PCollection<KV<String, CoGbkResult>> actualCategoriesCombined = KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed) .and(categoryTag, groupedCategories) .apply(CoGroupByKey.create()); 

And in my case, the last step is reformatting the results (from the marked groups in a continuous stream:

  actualCategoriesCombined .apply( ParDo.named("Actuals : Formatting") .of( new DoFn<KV<String, CoGbkResult>, TableRow>() { @Override public void processElement(ProcessContext c) throws Exception { KV<String, CoGbkResult> e = c.element(); Iterable<TableRow> actualTableRows = e.getValue().getAll(actualsTag); Iterable<Iterable<Map<String, String>>> categoriesAll = e.getValue().getAll(categoryTag); for (TableRow row : actualTableRows) { // Some of the actuals do not have categories if (categoriesAll.iterator().hasNext()) { row.put("advertiser", categoriesAll.iterator().next()); } c.output(row); } } } ) ) 

Hope this helps. Again - not sure about memory limitations. Please report the results if you try this approach.

+2
source

Source: https://habr.com/ru/post/1241465/


All Articles