I am trying to configure the following using Spring Integration:
- Send a message to the channel.
- Report this post about sharing a rabbit (pub / sub) with n users.
- Each consumer provides a response message.
- Have Spring Integration aggregates these responses before returning them to the original client.
I have a few problems with this so far ...
I use the publish-subscribe channel to set the apply-sequence="true" property so that the correId, sequenceSize, and sequenceNumber properties are set. These properties are thrown on DefaultAmqpHeaderMapper . DEBUG headerName=[correlationId] WILL NOT be mapped
The sequenceSize property is set only to 1, even if 2 queues are registered in the branch exchange. Presumably this would mean that messages would be released from the aggregator too soon. I expect this to be because I am abusing the publish-subscribe channel to use apply-sequence="true" , and it is fair to say that there is only one subscriber, int-amqp:outbound-gateway .
My outgoing Spring config looks like this:
<int:publish-subscribe-channel id="output" apply-sequence="true"/> <int:channel id="reply"> <int:interceptors> <int:wire-tap channel="logger"/> </int:interceptors> </int:channel> <int:aggregator input-channel="reply" method="combine"> <bean class="example.SimpleAggregator"/> </int:aggregator> <int:logging-channel-adapter id="logger" level="INFO"/> <int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/> <int-amqp:outbound-gateway request-channel="output" amqp-template="amqpTemplate" exchange-name="fanout-exchange" reply-channel="reply"/>
My rabbitMQ configuration is as follows:
<rabbit:connection-factory id="connectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="a-queue"/> <rabbit:queue name="b-queue"/> <rabbit:fanout-exchange name="fanout-exchange"> <rabbit:bindings> <rabbit:binding queue="a-queue" /> <rabbit:binding queue="b-queue" /> </rabbit:bindings> </rabbit:fanout-exchange>
The consumer is as follows:
<int:channel id="input"/> <int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/> <bean id="listenerService" class="example.ListenerService"/> <int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
Any suggestions would be great, I suspect I have the wrong end of the stick somewhere ...
Spring's new outbound configuration based on Gary's comments:
<int:channel id="output"/> <int:header-enricher input-channel="output" output-channel="output"> <int:correlation-id expression="headers['id']" /> </int:header-enricher> <int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" /> <int-amqp:outbound-gateway request-channel="output" amqp-template="amqpTemplate" exchange-name="fanout-exchange" reply-channel="reply" mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/> <int:channel id="reply"/> <int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2"> <bean class="example.SimpleAggregator"/> </int:aggregator>
source share