How to handle errors and not commit when using Kafka Streams DSL

For Kafka threads, if we use a lower level API, we can control the commit or not. Therefore, if problems arise in our code, and we do not want to transmit this message. In this case, Kafka will re-update this message several times until the problem is fixed.

But how to control whether to transmit a message when using its higher-level DSL interface?

Resources

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

+6
source share
1 answer

. " " - , ( API, DSL). ProcessorContext#commit() . , #commit() , . , , #commit(). commit.interval.m (cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

"" , :

  • , , " " (. ).
  • , "" (, Processor#process() KeyValueMapper#apply(), , ( , -, .. , - . heartbeat.interval.ms 0.10.1 session.timeout.ms [KIP-62])
  • , , StateStore . , , Streams (, ). , , .

, StreamThread ( , : http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. StreamThread , KafkaStreams .

, , , Streams , (, , ). (3), StateStore "" .

+8

Source: https://habr.com/ru/post/1014711/


All Articles