Apache Camel: topic answers for asyncCallback changed in Camel 17 and Camel 18/19

With Camel version up to 17.x, all Synchronization.onComplete() callback calls through activemq:queue executed in different threads, so even if processing the response inside onComplete was very slow, messages were not blocked in the queue either. This should be the result of the configuration asyncConsumer=true&defaultTaskExecutorType=ThreadPool&concurrentConsumers=2&maxConcurrentConsumers=100 , as I understand it. So the conclusion for this storefront is:

 Received async reply: 2000 OK Received async reply: 3000 OK Received async reply: 5000 OK Received async reply: 9000 OK Received async reply: 10000 OK Finished async reply: 2000 OK Finished async reply: 3000 OK Finished async reply: 5000 OK Finished async reply: 9000 OK Finished async reply: 10000 OK 

So, "finished" logs are still "accepted", since each of them is called in a different thread. All responses received, both asynchronous and lengthy, do not affect the reception of others.

After upgrading to Camel 18.x (or 19.x), this is no longer the same. And now, receiving a response and processing (lengthy process), it blocks the reception of others. This is due to the fact that the same thread is used to call Synchroniztion.onComplete (), so the responses are queued until complete processing.

 Received async reply: 2000 OK Received async reply: 10000 OK Received async reply: 9000 OK Finished async reply: 2000 OK Received async reply: 3000 OK Finished async reply: 3000 OK Finished async reply: 9000 OK Finished async reply: 10000 OK Received async reply: 5000 OK Finished async reply: 5000 OK 

I thought that these new properties will configure exactly this: replyToConcurrentConsumers = 2 & replyToMaxConcurrentConsumers = 100, and if a new message arrives, it will be processed and answered in a new thread if all current threads are being processed right now (and, of course, if max count is not achieved, then it is a normal thread for processing several messages and queuing them)

Maybe I'm something wrong, but how to set up the route so that I could get the same result as in Camel 17. Increasing the responseToConcurrentConsumers property works, but I think it needs to be scaled dynamically and rely on answerToMaxConcurrentConsumers if it is necessary.

Code:

 public class AsyncCallbackTest { private static final Logger LOG = LoggerFactory.getLogger(AsyncCallbackTest.class); public static void main(String[] args) throws Exception { // create and setup a default Camel context final CamelContext camelContext = new DefaultCamelContext(); setupRoutes(camelContext); camelContext.start(); // real test asyncCallbackTest(camelContext); camelContext.stop(); System.exit(0); } private static void setupRoutes(CamelContext camelContext) throws Exception { camelContext.addRoutes(new RouteBuilder() { public void configure() { from("activemq://queue:asyncTest?asyncConsumer=true&defaultTaskExecutorType=ThreadPool" + "&concurrentConsumers=2&maxConcurrentConsumers=100") .process(exchange -> { final String msg = String.valueOf(exchange.getIn().getBody()); exchange.getOut().setBody(msg + " OK"); }); } }); } private static void asyncCallbackTest(CamelContext camelContext) throws Exception { final int[] delays = new int[]{9000, 10000, 5000, 2000, 3000}; final CountDownLatch countDownLatch = new CountDownLatch(delays.length); final Synchronization callback = new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { LOG.info("Received async reply: " + exchange.getOut().getBody()); final int delay = (int) exchange.getIn().getBody(); synchronized (this) { try { this.wait(delay); } catch (InterruptedException e) { e.printStackTrace(); } } LOG.info("Finished async reply: " + exchange.getOut().getBody()); super.onComplete(exchange); } @Override public void onDone(Exchange exchange) { countDownLatch.countDown(); } }; final ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); for (int i = 0; i < delays.length; i++) { final Exchange exchange = new DefaultExchange(camelContext); exchange.getIn().setBody(delays[i]); exchange.setPattern(ExchangePattern.InOut); producerTemplate.asyncCallback("activemq://queue:asyncTest?" + "replyToConcurrentConsumers=2&replyToMaxConcurrentConsumers=100", exchange, callback); } countDownLatch.await(); camelContext.stop(); } 

}

+5
source share

Source: https://habr.com/ru/post/1270999/


All Articles