How to properly handle errors in a custom MapFunction?

I have implemented MapFunction for Apache Flink thread. It analyzes incoming elements and converts them to a different format, but sometimes an error may occur (i.e., incoming data is not allowed).

I see two possible ways to handle it:

  • Ignore invalid elements, but it seems like I cannot ignore errors, because for any incoming element I have to provide an outgoing element.
  • Divide the input elements into valid and invalid, but it seems like I should use a different function for this.

So, I have two questions:

  • How to handle errors in my MapFunction ?
  • How to properly implement such conversion functions?
+5
source share
2 answers

You can use FlatMapFunction instead of MapFunction . This will allow you to emit only the element if it is valid. The following is an example implementation:

 input.flatMap(new FlatMapFunction<String, Long>() { @Override public void flatMap(String input, Collector<Long> collector) throws Exception { try { Long value = Long.parseLong(input); collector.collect(value); } catch (NumberFormatException e) { // ignore invalid data } } }); 
+5
source

This should be based on the idea of ​​@Till Rohrmann above. Add this as an answer instead of a comment for better formatting.

I think one way to implement split + select could be to use a ProcessFunction with SideOutput. My chart will look something like this:

 Source --> ValidateProcessFunction ---good data--> UDF--->SinkToOutput \ \---bad data----->SinkToErrorChannel 

Will this work? Is there a better way?

0
source

Source: https://habr.com/ru/post/1245319/


All Articles