Creating Transformations in a Generic Data Stream

This is related to another SO issue [here] ( Customizing custom encoders and handling parameterized types ). After the workarounds, they helped me use custom types in transforms. But since my custom types are generic, I hoped to make even general conversion classes that could then parameterize a custom type with the same type. But when I try to do this, I encounter I can’t provide Coder for a variable of type T, because the actual type is unknown due to erasure . The work around suggested registering an encoder that would return a type parameter, but since the type parameter itself is unknown, I think this exception was thrown, and I was not sure how to get around this.

static class Processor<T> 
  extends PTransform<PCollection<String>, 
                     PCollection<KV<String, Set<CustomType<T>>>>> { 

  private static final long serialVersionUID = 0; 

  @Override public PCollection<KV<String, Set<CustomType<T>>>>  
  apply(PCollection<String> items) {
    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items     
        .apply(ParDo.of(new ParDoFn())); 
    PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
        .apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
  }
} 
+2
1

Github Issue № 57 .

Dataflow , . , :

class CustomType<T extends Serializable> { ... }

class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

    class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }

    class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>>
    apply(PCollection<String> items) {

      PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
          items.apply(ParDo.of(new ParDoFn()));

      PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
          partitionedItems.apply(
              Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
                  new Merger()));

      return combinedItems;
    }
}


PCollection<String> input = ...
input.apply(new Processor<String>());

Dataflow DoFn TypeDescriptor, getOutputTypeDescriptor

ParDoFn Processor<T>, Set<CustomType<T>>, Processor<String>.

, ParDoFn , T. .

1. Processor

PCollection<String> input = ...
input.apply(new Processor<String>() {});

, Processor T String. , Processor , .

2. getOutputTypeDescriptor ParDoFn, Processor.

class Processor<T extends Serializable> extends ... {
  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
        Processor.this.getClass()) {};
    }
 }

. , , Github Issue № 57 .

class CustomType<T extends Serializable> { ... }

abstract class Processor<T extends Serializable>
    extends PTransform<PCollection<String>,
                       PCollection<KV<String, Set<CustomType<T>>>>> {

  class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
    ...

    @Override
    protected TypeDescriptor<KV<String, Set<CustomType<T>>>> 
    getOutputTypeDescriptor() {
      return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
          Processor.this.getClass()) {};
    }
  }

  class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }

    @Override
    public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {

    PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
        items.apply(ParDo.of(new ParDoFn()));

    PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
        partitionedItems.apply(
          Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
              new Merger()));

    return combinedItems;
  }
}

PCollection<String> input = …;
input.apply(new Processor<String>() {});

- Processor.getDefaultOutputCoder setCoder partitionedItems, .

+4

Source: https://habr.com/ru/post/1606646/


All Articles