I think CoGroupByKey might work in your situation:
https://cloud.google.com/dataflow/model/group-by-key#join
What have I done for a similar use case. Although mine was probably not limited by memory (have you tried a larger cluster with larger machines?):
PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified .apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow())); PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p [...] .apply(GroupByKey.create());
Thus, collections are entered with the same key.
Then I declared the tags:
final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>(); final TupleTag<TableRow> actualsTag = new TupleTag<>();
Combine them:
PCollection<KV<String, CoGbkResult>> actualCategoriesCombined = KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed) .and(categoryTag, groupedCategories) .apply(CoGroupByKey.create());
And in my case, the last step is reformatting the results (from the marked groups in a continuous stream:
actualCategoriesCombined .apply( ParDo.named("Actuals : Formatting") .of( new DoFn<KV<String, CoGbkResult>, TableRow>() { @Override public void processElement(ProcessContext c) throws Exception { KV<String, CoGbkResult> e = c.element(); Iterable<TableRow> actualTableRows = e.getValue().getAll(actualsTag); Iterable<Iterable<Map<String, String>>> categoriesAll = e.getValue().getAll(categoryTag); for (TableRow row : actualTableRows) {
Hope this helps. Again - not sure about memory limitations. Please report the results if you try this approach.