It looks like you were bitten by two problems. Thank you for bringing them to our attention! Fortunately, there are easy workarounds for both, while we are improving the situation.
, Set.class - SetCoder. GitHub № 56, . :
pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
, , @DefaultCoder . Github № 57, . , SerializableCoder CustomType, - CoderFactory , SerializableCoder. , :
public class CustomType<T extends Serializable> implements Serializable {
T field;
}
CoderFactory, SerializableCoder:
pipeline.getCoderRegistry().registerCoder(CustomType.class, new CoderFactory() {
@Override
public Coder<?> create(List<? extends Coder<?>>) {
return SerializableCoder.of(CustomType.class);
}
@Override
public List<Object> getInstanceComponents(Object value) {
return Collections.singletonList(((CustomType<Object>) value).field);
}
});
, CustomType , coder SerializableCoder.
, SerializableCoder ( , equals()), , , GroupByKey.