Configuring Spring Integration Aggregator to Combine RabbitMq Branching Exchange Responses

I am trying to configure the following using Spring Integration:

  • Send a message to the channel.
  • Report this post about sharing a rabbit (pub / sub) with n users.
  • Each consumer provides a response message.
  • Have Spring Integration aggregates these responses before returning them to the original client.

I have a few problems with this so far ...

  • I use the publish-subscribe channel to set the apply-sequence="true" property so that the correId, sequenceSize, and sequenceNumber properties are set. These properties are thrown on DefaultAmqpHeaderMapper . DEBUG headerName=[correlationId] WILL NOT be mapped

  • The sequenceSize property is set only to 1, even if 2 queues are registered in the branch exchange. Presumably this would mean that messages would be released from the aggregator too soon. I expect this to be because I am abusing the publish-subscribe channel to use apply-sequence="true" , and it is fair to say that there is only one subscriber, int-amqp:outbound-gateway .

My outgoing Spring config looks like this:

 <int:publish-subscribe-channel id="output" apply-sequence="true"/> <int:channel id="reply"> <int:interceptors> <int:wire-tap channel="logger"/> </int:interceptors> </int:channel> <int:aggregator input-channel="reply" method="combine"> <bean class="example.SimpleAggregator"/> </int:aggregator> <int:logging-channel-adapter id="logger" level="INFO"/> <int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/> <int-amqp:outbound-gateway request-channel="output" amqp-template="amqpTemplate" exchange-name="fanout-exchange" reply-channel="reply"/> 

My rabbitMQ configuration is as follows:

 <rabbit:connection-factory id="connectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="a-queue"/> <rabbit:queue name="b-queue"/> <rabbit:fanout-exchange name="fanout-exchange"> <rabbit:bindings> <rabbit:binding queue="a-queue" /> <rabbit:binding queue="b-queue" /> </rabbit:bindings> </rabbit:fanout-exchange> 

The consumer is as follows:

 <int:channel id="input"/> <int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/> <bean id="listenerService" class="example.ListenerService"/> <int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/> 

Any suggestions would be great, I suspect I have the wrong end of the stick somewhere ...

Spring's new outbound configuration based on Gary's comments:

 <int:channel id="output"/> <int:header-enricher input-channel="output" output-channel="output"> <int:correlation-id expression="headers['id']" /> </int:header-enricher> <int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" /> <int-amqp:outbound-gateway request-channel="output" amqp-template="amqpTemplate" exchange-name="fanout-exchange" reply-channel="reply" mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/> <int:channel id="reply"/> <int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2"> <bean class="example.SimpleAggregator"/> </int:aggregator> 
+4
source share
2 answers

The problem is that S. I. does not know about the branching exchange topology.

The easiest way is to use a custom version strategy

 release-strategy-expression="size() == 2" 

on the aggregator (subject to branching 2). Thus, you do not need the size of the sequence; you can avoid the "abuse" of the pub / subchannel with a better header ...

  <int:header-enricher input-channel="foo" output-channel="bar"> <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" /> </int:header-enricher> 

You can avoid creating a new UUID using a message identifier that is already unique ...

 <int:correlation-id expression="headers['id']" /> 

Finally, you can pass the correId header to AMQP by adding

 mapped-request-headers="correlationId" 

to your amqp endpoints.

+3
source

Despite the fact that this question is 3 years old, I am going to answer it because I had the same question.

Spring Integration has a Scatter-Gather implementation that is very similar to your original question.

Here is the relevant section from the Spring Documentation

This is a complex endpoint whose purpose is to send a message to the recipients and summarize the results ....

Previously, the template could be configured using discrete components, this enhancement provides a more convenient configuration.

The ScatterGatherHandler is the responder request endpoint that combines PublishSubscribeChannel (or RecipientListRouter) and AggregatingMessageHandler. The request message is sent to the scatter channel and the ScatterGatherHandler waits for a response from the aggregator to be sent to the output channel.

0
source

Source: https://habr.com/ru/post/1442820/


All Articles