How to do it manually using Kafka Stream?

Is there a way to do manual recording using the Kafka stream?

Usually with KafkaConsumer I do something like below:

 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ // process records } consumer.commitAsync(); } 

Where I call commit manually. I do not see a similar API for KStream .

+5
source share
1 answer

Commits are handled by Streams internally and fully automatically, and there is usually no reason to do it manually. Note that Streams handles this differently than consumer auto-commenting - in fact, auto-commit is disabled for the internal consumer, and Streams manages the commit manually. The reason is that commits can only occur at certain points during processing to ensure that data cannot be lost (there are many internal dependencies regarding updating status and cleaning results).

For more frequent commits, you can reduce the commit interval using the StreamsConfig commit.interval.ms parameter.

However, manual commands are possible indirectly using the low-level processor API. You can use the context object provided with the init() method to call context#commit() . Please note that this is only a “thread request” to commit as soon as possible - it does not send the commit directly.

+12
source

Source: https://habr.com/ru/post/1266738/


All Articles