Spring JMSTemplate receives all messages in a single transaction

I am trying to get all messages from a queue in synchronous mode using the Spring method JMSTemplate.receive (String).

The problem is that I always get only one message. Here is the code:

@Transactional public List<Message> receiveAllFromQueue(String destination) { List<Message> messages = new ArrayList<Message>(); Message message; while ((message = queueJmsTemplate.receive(destination)) != null) { messages.add(message); } return messages; } 

If I delete the @Transactional annotation, I get all the messages, but everything is done from the transaction, so if an exception occurs later in the processing of these messages, the messages will be lost.

Here is the definition of my JMSTemplate bean.

 <bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="pubSubDomain" value="false" /> <property name="receiveTimeout" value="1" /> <property name="sessionTransacted" value="true" /> </bean> 

I want to reach a single transaction and inside this transaction I want to receive all pending messages.

+4
source share
1 answer

I will answer myself. It looks like JMSTemplate does not support it. The only way to temporarily allow this is to extend the JMSTemplate and add a new method that uses parts of the JMSTemplate. Unfortunately, some methods are private, so you need to copy them ...

 public class CustomQueueJmsTemplate extends JmsTemplateDelegate { public List<Message> receiveAll(String destinationName) { return receiveAll(destinationName, null); } public List<Message> receiveAll(final String destinationName, final String messageSelector) { return execute(new SessionCallback<List<Message>>() { @Override public List<Message> doInJms(Session session) throws JMSException { Destination destination = resolveDestinationName(session, destinationName); return doReceiveAll(session, destination, messageSelector); } }, true); } private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector) throws JMSException { return doReceiveAll(session, createConsumer(session, destination, messageSelector)); } private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException { try { // Use transaction timeout (if available). long timeout = getReceiveTimeout(); JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager .getResource(getConnectionFactory()); if (resourceHolder != null && resourceHolder.hasTimeout()) { timeout = resourceHolder.getTimeToLiveInMillis(); } // START OF MODIFIED CODE List<Message> messages = new ArrayList<>(); Message message; while ((message = doReceive(consumer, timeout)) != null) { messages.add(message); } // END OF MODIFIED CODE if (session.getTransacted()) { // Commit necessary - but avoid commit call within a JTA transaction. if (isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } else if (isClientAcknowledge(session)) { // Manually acknowledge message, if any. for (Message retrievedMessages : messages) { retrievedMessages.acknowledge(); } } return messages; } finally { JmsUtils.closeMessageConsumer(consumer); } } private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException { if (timeout == RECEIVE_TIMEOUT_NO_WAIT) { return consumer.receiveNoWait(); } else if (timeout > 0) { return consumer.receive(timeout); } else { return consumer.receive(); } } } 
+3
source

Source: https://habr.com/ru/post/1494255/


All Articles