Im trying to come up with a JMS-ActiveMQ implementation that supports rollback using a transactional session.
Im very new to ActiveMQ and Ive taken the first hit in the implementation using their Java libraries.
When I launch my application, I see that the messages have been successfully queued and deleted. I also see that the corresponding DLQ is automatically generated. However, I'm not sure if I configured redeliverypolicy correctly. At the moment, it is configured for the manufacturer, but some examples bind the forwarding policy to the listener container, so I'm not quite sure if the poisoned messages will be placed in DLQ, in my case (if at all). Detailed comments are in fragments.
In addition, all the examples I've come across so far use Spring. However, I dont have the opportunity to use it, you will need to reinstall the entire project (Im open if it includes only minimal overhead).
Any insight into how I can do this in Java using the ActiveMQ apis would be greatly appreciated.
Manufacturer
public void publishUpdate(final MessageBody payload)
throws JMSException {
Session session = session(connection());
try {
Message message = message(session, payload);
LOGGER.info("About to put message on queue");
producer(session).send(message);
session.commit();
} catch ( BadRequestException e) {
LOGGER.info("Badly formed request. Not attempting retry!");
return;
} catch (JMSException jmsExcpetion) {
LOGGER.info("Caught JMSException will retry");
session.rollback();
}
}
private MessageProducer producer(Session session) throws JMSException {
return session.createProducer(destination());
}
private Connection connection() throws JMSException {
ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());
return connection;
}
private Session session(Connection connection) throws JMSException {
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
connection.start();
return session;
}
LISTENER:
public class UpdateMessageListener implements MessageListener{
….
public void onMessage(Message message) {
String json = null;
try {
json = ((TextMessage) message).getText();
MessageBody request = SerializeUtils.deserialize(json, MessageBody.class);
processTransaction(request.getUpdateMessageBody(), headers);
} catch (Throwable e) {
LOGGER.error("Error processing request: {}", json);
}
}
}
Consumer:
private MessageConsumer consumer() throws JMSException {
LOGGER.info("Creating consumer");
MessageConsumer consumer = session().createConsumer(destination());
consumer.setMessageListener(new UpdateMessageListener());
return consumer;
}
private Session session() throws JMSException {
Connection connection=connection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
connection.start();
return session;
}
I am also ready to provide additional code if necessary.