How to manually control offset fixation using camel-kafka?

I use the camel kafka component, and I don’t understand what happens under the hood with offsets. As you can see below, I collect the records, and I think that for my use case it only makes sense to fix the offsets after the records were saved in SFTP.

Is it possible to manually control when I can commit?

private static class MyRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("kafka:{{mh.topic}}?" + getKafkaConfigString()) .unmarshal().string() .aggregate(constant(true), new MyAggregationStrategy()) .completionSize(1000) .completionTimeout(1000) .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime()) .to("sftp://" + getSftpConfigString()) // how to commit offset only after saving messages to SFTP? ; } private final class MyAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); String body = oldBody + newBody; oldExchange.getIn().setBody(body); return oldExchange; } } } private static String getSftpConfigString() { return "{{sftp.hostname}}/{{sftp.dir}}?" + "username={{sftp.username}}" + "&password={{sftp.password}}" + "&tempPrefix=.temp." + "&fileExist=Append" ; } private static String getKafkaConfigString() { return "brokers={{mh.brokers}}" + "&saslMechanism={{mh.saslMechanism}}" + "&securityProtocol={{mh.securityProtocol}}" + "&sslProtocol={{mh.sslProtocol}}" + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}" + "&saslJaasConfig={{mh.saslJaasConfig}}" + "&groupId={{mh.groupId}}" ; } 
+1
source share
3 answers

No, you can’t. Kafka automatically locks in the background every X seconds (you can configure this).

Camel-kafka does not support manual commit. Also, this would not be possible, since the aggregator was separated from the consumer of the kafka and its consumer, which performs the fixation.

+2
source

I think this is a change in the latest version of the camel (2.22.0) ( document ), you should be able to do this.

 // Endpoint configuration &autoCommitEnable=false&allowManualCommit=true public void process(Exchange exchange) { KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commitSync(); } 
0
source

You can control the offset offset manually even in a multi-threaded route (using the aggregator for example) using the offset repository ( Camel Documentation )

0
source

Source: https://habr.com/ru/post/1231932/


All Articles