How to make consumer shifts with Camel-kafka?

I use camel apache to integrate with my kafka post. I also use JAVA DSL to consume messages from kafka endpoint.

Using the apache kafka API, he knows how to comment on consumer offsets with given properties.

if I disbale auto commit in camel-kafka component, how can I commit offsets in apcahe camel, then?

I use the lower endpoint to disable auto-commit in Apache Camel

kafka.consumer.uri = kafka://{{kafka.host}}?zookeeperHost={{zookeeper.host}}&zookeeperPort={{zookeeper.port}}&groupId={{kafka.consumerGroupId}}&topic={{kafka.topic.transformer_t}}&{{kafka.uri.options}} kafka.uri.options = autoCommitEnable=false 
+5
source share
1 answer

Support for manual offset locking has been added to CAMEL-11933 and is supported since version 2.21.0. You can enable it with the allowManualCommit = true option. This option will display the CamelKafkaManualCommit header containing an instance of KafkaManualCommit , which can then be used, for example, in Processor .

 from("kafka:topic?groupId=group&autoCommitEnable=false&allowManualCommit=true") //... .process(exchange -> { exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class).commitSync(); }); 
0
source

Source: https://habr.com/ru/post/1231930/


All Articles