I have a single-threaded ActiveMQ consumer written in Java. All I am trying to do is get () the messsage from the queue, try sending it to the web service, and if it succeeds in validating it (). If the web service call fails, I want the message to remain in the queue and be resent after some timeout.
This is more or less working, except for the repeated part: every time I restart my consumer, he receives one message for each, which is still in the queue, but after sending them failed, the messages will never be resent shipped.
My code looks like this:
public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); // ???? Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); destination = session.createQueue(subject); //??? consumer = session.createConsumer(destination); //consumer.setMessageListener(this); // message listener had same behaviour } private void process() { while(true) { System.out.println("Waiting..."); try { Message message = consumer.receive(); onMessage(message); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(!client.sendMessage(msg)) { System.out.println("Webservice call failed. Keeping message"); //message. } else { message.acknowledge(); } if (transacted) { if ((messagesReceived % batch) == 0) { System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } } }
I am not currently using transactions (maybe I should be?).
Iβm sure that Iβm missing something light and will soon pat my forehead, but I canβt understand how this should work. Thanks!
EDIT: can't answer this myself, as rep is not enough:
OK, after several experiments, it turns out that transactions are the only way to do this. Here is the new code:
public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(1000L); policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(client.sendMessage(msg)) { if(transacted) { System.out.println("Call succeeded - committing message"); session.commit(); }
Now the message is forwarded every 1000 ms, as indicated in the forwarding policy.
Hope this helps someone else! :)