Spring AMQP Integration - Consumer Confirmation

I am testing Spring-AMQP with Spring-Integration support, after setup and test:

 <rabbit:connection-factory id="connectionFactory" /> <rabbit:queue name="durableQ"/> <int:channel id="consumingChannel"> <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q --> </int:channel> <int-amqp:inbound-channel-adapter channel="consumingChannel" queue-names="durableQ" connection-factory="connectionFactory" concurrent-consumers="1" acknowledge-mode="AUTO" /> public static void main(String[] args) { System.out.println("Starting consumer with integration.."); AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml"); PollableChannel consumingChannel = context.getBean("consumingChannel", PollableChannel.class); int count = 0; while (true) { Message<?> msg = consumingChannel.receive(1000); System.out.println((count++) + " \t -> " + msg); try { //sleep to check number of messages in queue Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); } } } 

In this configuration, it was obvious that as soon as the message arrives at the consumingChannel , they are Acked and therefore removed from the queue. I confirmed this by setting high sleep after receive and checking the queue-size . There is no further control over him.

Now, if I set acknowledge-mode=MANUAL , there is no way to make manual ack using spring integration.

I need to process the message and after processing do manual-ack , so until ack message remains in durableQ .

Is there a way to handle MANUAL ack with spring-amqp-integration ? I want to avoid ChannelAwareMessageListener to inbound-channel-adapter , since I want to control the receive user.

Update:

This even seems impossible when using your own listener-container with inbound-channel-adapter :

 // Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above <int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> <bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="queueNames" value="durableQ" /> <property name="acknowledgeMode" value="MANUAL" /> // messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack <property name="messageListener" ref="listener"/> </bean> <bean id="listener" class="com.sd.springint.rmq.MsgListener"/> 

An error is thrown over the configuration, because the messageListener property is messageListener valid, see the built-in comment for the tag. Thus, the purpose of using listner-container was defeated (to expose a channel through a ChannelAwareMessageListener ).

I can’t use Spring-Integration for manual-acknowledgement (I know this is a heavy statement!). Can someone help me verify this, or is there any specific approach / configuration required for this that I don't see?

+3
source share
2 answers

The problem is that you are using asynchronous handoff using QueueChannel . It is usually better to control concurrency in a container ( concurrent-consumers="2" ) and not perform asynchronous transfers in the stream (use DirectChannel s). This way AUTO ack will work fine. Instead of receiving from PollableChannel subscription of a new MessageHandler() to SubscribableChannel .

Update:

Usually you do not need to deal with Messages in the SI application, but the equivalent of your test with DirectChannel will be ...

  SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class); channel.subscribe(new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println("Got " + message); } }); 
+3
source

MANUAL Ack is only allowed through Channel.basicAck() . Therefore, you must have access to the Channel on which your message was received.

Try playing with advice-chain in <int-amqp:inbound-channel-adapter> :

  • Add multiple Advice as MethodBeforeAdvice
  • advice-chain in the container applies to ContainerDelegate#invokeListener
  • The first argument to this method is exactly Channel
  • Suppose you can put in MessageProperties.headers that Channel inside this Advice
  • And configure <int-amqp:inbound-channel-adapter> using mapped-request-headers on Channel .
  • Finally, try calling basicAck() on this Channel header from the Spring Integration message anywhere in your stream downstream.
0
source

Source: https://habr.com/ru/post/1203234/


All Articles