Apachev Camel - cumulative route with the exception of

I want to use Apache Camel's parallel multicast for 3 routes and aggregate (and wait) 2 of them, and leave the 3rd on my own (the third should not block the first two). I also need to handle these two in all cases, which means that if one of them does not work (for example, throws an exception during processing), it must also be aggregated.

From what I understood from the Camel documentation, this behavior should be “default” unless you specify say stopOnException. But what happens is that an exchange with an exception never hits my AggregationStrategy. What is strange is that the processors behind the unit are executed even when there is a completion. (2).

So my question is: how is it possible that the route after aggregation continues without exchanging exchanges with the exception in AggregationStrategy? And how to solve my case? Note that executing .to ("direct: sync") is not the case because this exception can be thrown from a route that completely leaves the multicast aggregate part of the routes.

Here is a sample code (ExceptionThrowingProcessor throws MyException, PropertySetterProcessor doesn't matter):

CamelContext context = new DefaultCamelContext(); RouteBuilder builder = new RouteBuilder() { @Override public void configure() throws Exception { } }; builder.onException(MyException.class) .process(new PropertySetterProcessor("a", "onException")) .handled(true); builder.from("direct:input") .process(new PropertySetterProcessor("a", "before-multicast")) .multicast() .parallelProcessing() .shareUnitOfWork() .to("direct:part1", "direct:part2", "direct:part3") builder.from("direct:part1") .process(new PropertySetterProcessor("a", "part1")) .to("direct:sync"); builder.from("direct:part2") .process(new PropertySetterProcessor("a", "part2")) .process(new ExceptionThrowingProcessor("oops")) // throws MyException .to("direct:sync"); builder.from("direct:part3") .process(new PropertySetterProcessor("a", "part3")); // don't want this to be aggregated within direct:sync builder.from("direct:sync") .aggregate(new TestAggregationStrategy()) // strategy.aggregate is called only once (from part1) but not from part2 :( .constant(true) .completionSize(2) .process(new PropertySetterProcessor("a", "after-aggregation")); context.addRoutes(builder); context.start(); ProducerTemplate template = context.createProducerTemplate(); template.send("direct:input", new DefaultExchange(context, ExchangePattern.InOut)); 
+5
source share

Source: https://habr.com/ru/post/1263916/


All Articles