Configuring custom encoders and handling parameterized types

I have two questions related to encoder issues that I encountered with a data stream pipeline.

  • How do I configure the encoder for my custom data types? The class consists of only three elements: two two-local and another parameterized property. I tried to annotate the type using SerializableCoder, but I still get the error message "com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException: cannot provide value-based encoder using class interface java.util.Set: CoderFactory not registered for class. " The collection actually contains a parameterized user data type - so I assume that the user data type is a problem. I could not find enough documentation / examples on the right path for this. Please indicate me the desired location, if available.
  • Even without a custom data type, whenever I try to switch to a parameterized version of the Transform functions, this leads to encoder errors. In particular, inside a complex conversion that is parameterized, ParDo works with parameterized types, but when I apply Combine.PerKey to the resulting PCollection after ParDo, it throws a CoderNotFoundException.

Any help regarding these two elements would be helpful, since I was kind of stuck in this for a while.

+4
source share
1 answer

It looks like you were bitten by two problems. Thank you for bringing them to our attention! Fortunately, there are easy workarounds for both, while we are improving the situation.

, Set.class - SetCoder. GitHub № 56, . :

pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);

, , @DefaultCoder . Github № 57, . , SerializableCoder CustomType, - CoderFactory , SerializableCoder. , :

public class CustomType<T extends Serializable> implements Serializable {
  T field;
}

CoderFactory, SerializableCoder:

pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
  @Override
  public Coder<?> create(List<? extends Coder<?>>) {
    // No matter what the T is, return SerializableCoder
    return SerializableCoder.of(CustomType.class);
  }

  @Override
  public List<Object> getInstanceComponents(Object value) {
    // Return the T inside your CustomType<T> to enable coder inference for Create
    return Collections.singletonList(((CustomType<Object>) value).field);
  }
});

, CustomType , coder SerializableCoder.

, SerializableCoder ( , equals()), , , GroupByKey.

+13

Source: https://habr.com/ru/post/1606643/


All Articles