Github Issue № 57 .
Dataflow , . , :
class CustomType<T extends Serializable> { ... }
class Processor<T extends Serializable>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }
class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }
@Override
public PCollection<KV<String, Set<CustomType<T>>>>
apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
partitionedItems.apply(
Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
new Merger()));
return combinedItems;
}
}
…
PCollection<String> input = ...
input.apply(new Processor<String>());
Dataflow DoFn TypeDescriptor, getOutputTypeDescriptor
ParDoFn Processor<T>, Set<CustomType<T>>, Processor<String>.
, ParDoFn , T. .
1. Processor
PCollection<String> input = ...
input.apply(new Processor<String>() {});
, Processor T String. , Processor , .
2. getOutputTypeDescriptor ParDoFn, Processor.
class Processor<T extends Serializable> extends ... {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
@Override
protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
getOutputTypeDescriptor() {
return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
Processor.this.getClass()) {};
}
}
. , , Github Issue № 57 .
class CustomType<T extends Serializable> { ... }
abstract class Processor<T extends Serializable>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
...
@Override
protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
getOutputTypeDescriptor() {
return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
Processor.this.getClass()) {};
}
}
class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }
@Override
public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
partitionedItems.apply(
Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
new Merger()));
return combinedItems;
}
}
PCollection<String> input = …;
input.apply(new Processor<String>() {});
- Processor.getDefaultOutputCoder setCoder partitionedItems, .