Spring integration: how to handle service exceptions after an aggregator?

I have an application based on the integration of Spring (4.0.4.RELEASE) and RabbitMQ. My thread is as follows:

Messages are queued through the process (they do not expect a response): Gateway → Channel → RabbitMQ

And then reset by another process:

  RabbitMQ --1 -> inbound-channel-adapter A --2 -> chain B --3 -> aggregator C --4 -> service-activator D --5 -> final service-activator E

Explanations and Context

Specifically, nowhere in my application do I use a splitter: the C aggregator simply waits for a sufficient number of messages, or to time out, and then forwards the packet to service D. Messages can be stuck in aggregator C for quite some time and should NOT be considered consumed there. They should be used only once when service D has completed successfully. Therefore, I use MANUAL confirmation on the incoming channel adapter A and the service E is responsible for confirming the packet.

Custom aggregator

I solved the confirmation problem that I had when setting to AUTO by overriding the aggregator. Indeed, messages are acknowledged immediately if any asynchronous process occurs in the thread (see Question here ). So I switched to MANUAL confirmation and applied the aggregator as follows:

<bean class="org.springframework.integration.config.ConsumerEndpointFactoryBean"> <property name="inputChannel" ref="channel3"/> <property name="handler"> <bean class="org.springframework.integration.aggregator.AggregatingMessageHandler"> <constructor-arg name="processor"> <bean class="com.test.AMQPAggregator"/> </constructor-arg> <property name="correlationStrategy"> <bean class="com.test.AggregatorDefaultCorrelationStrategy" /> </property> <property name="releaseStrategy"> <bean class="com.test.AggregatorMongoReleaseStrategy" /> </property> <property name="messageStore" ref="messageStoreBean"/> <property name="expireGroupsUponCompletion" value="true"/> <property name="sendPartialResultOnExpiry" value="true"/> <property name="outputChannel" ref="channel4"/> </bean> </property> </bean> <bean id="messageStoreBean" class="org.springframework.integration.store.SimpleMessageStore"/> <bean id="messageStoreReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="messageStore" /> <property name="timeout" value="${myapp.timeout}" /> </bean> <task:scheduled-tasks> <task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" /> </task:scheduled-tasks> 

I really wanted to collect the headers differently and keep the highest value of the entire amqp_deliveryTag for later reuse in E service (see this thread). This has worked great so far, except that it is much more verbose than the typical aggregator namespace (see this old Jira ticket).

Services

I just use the basic configurations:

chain B

 <int:chain input-channel="channel2" output-channel="channel3"> <int:header-enricher> <int:error-channel ref="errorChannel" /> // Probably useless </int:header-enricher> <int:json-to-object-transformer/> <int:transformer ref="serviceABean" method="doThis" /> <int:transformer ref="serviceBBean" method="doThat" /> </int:chain> 

service D

 <int:service-activator ref="serviceDBean" method="doSomething" input-channel="channel4" output-channel="channel5" /> 

Error management

Since I rely on MANUAL acknowledgment, I also need to manually reject messages if an exception occurs. I have the following definition for inbound adapter channel A :

 <int-amqp:inbound-channel-adapter channel="channel2" queue-names="si.queue1" error-channel="errorChannel" mapped-request-headers="*" acknowledge-mode="MANUAL" prefetch-count="${properties.prefetch_count}" connection-factory="rabbitConnectionFactory"/> 

I use the following definition for errorChannel :

 <int:chain input-channel="errorChannel"> <int:transformer ref="errorUnwrapperBean" method="unwrap" /> <int:service-activator ref="amqpAcknowledgerBean" method="rejectMessage" /> </int:chain> 

ErrorUnwrapper is based on this code, and all exception detection and message rejection work well until messages reach the C aggregator .

Problem

If an exception occurs while processing messages in the D activator service , then I see this exception, but errorChannel does not seem to receive any message, and my ErrorUnwrapper unwrap () method is not called. The hidden stack traces that I see when an Exception ("hahaha") is thrown are as follows:

 2014-09-23 16:41:18,725 ERROR osisSimpleMessageStore:174: Exception in expiry callback org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) (...) Caused by: java.lang.Exception: ahahaha at com.myapp.ServiceD.doSomething(ServiceD.java:153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) (...) 2014-09-23 16:41:18,733 ERROR osssTaskUtils$LoggingErrorHandler:95: Unexpected error occurred in scheduled task. org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha (...) 

Question

How can I tell services that process messages from such an aggregator to post errors before errorChannel ? I tried to indicate in the title through the title - to enrich the error channel without any luck. I use the default errorChannel definition, but I also tried changing its name and overriding it. I do not know here, and although I found this and that , I was not able to get it to work. Thanks in advance for your help!

+5
source share
1 answer

As you can see from StackTrace, your process starts from the MessageGroupStoreReaper , which is triggered by default ThreadPoolTaskScheduler .

So you must provide a custom bean for this:

 <bean id="scheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> <property name="errorHandler"> <bean class="org.springframework.integration.channel.MessagePublishingErrorHandler"> <property name="defaultErrorChannel" ref="errorChannel"/> </bean> </property> </bean> <task:scheduled-tasks scheduler="scheduler"> <task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" /> </task:scheduled-tasks> 

However, I see the benefits of error-channel on <aggregator> , where we really have several points from different separate streams with which we cannot communicate normally.

+1
source

Source: https://habr.com/ru/post/1203227/


All Articles