I use the camel kafka component, and I donβt understand what happens under the hood with offsets. As you can see below, I collect the records, and I think that for my use case it only makes sense to fix the offsets after the records were saved in SFTP.
Is it possible to manually control when I can commit?
private static class MyRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("kafka:{{mh.topic}}?" + getKafkaConfigString()) .unmarshal().string() .aggregate(constant(true), new MyAggregationStrategy()) .completionSize(1000) .completionTimeout(1000) .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime()) .to("sftp://" + getSftpConfigString())
source share