Unable to get ActiveMQ to resend my messages

I have a single-threaded ActiveMQ consumer written in Java. All I am trying to do is get () the messsage from the queue, try sending it to the web service, and if it succeeds in validating it (). If the web service call fails, I want the message to remain in the queue and be resent after some timeout.

This is more or less working, except for the repeated part: every time I restart my consumer, he receives one message for each, which is still in the queue, but after sending them failed, the messages will never be resent shipped.

My code looks like this:

public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); // ???? Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); destination = session.createQueue(subject); //??? consumer = session.createConsumer(destination); //consumer.setMessageListener(this); // message listener had same behaviour } private void process() { while(true) { System.out.println("Waiting..."); try { Message message = consumer.receive(); onMessage(message); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(!client.sendMessage(msg)) { System.out.println("Webservice call failed. Keeping message"); //message. } else { message.acknowledge(); } if (transacted) { if ((messagesReceived % batch) == 0) { System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } } } 

I am not currently using transactions (maybe I should be?).

I’m sure that I’m missing something light and will soon pat my forehead, but I can’t understand how this should work. Thanks!


EDIT: can't answer this myself, as rep is not enough:

OK, after several experiments, it turns out that transactions are the only way to do this. Here is the new code:

 public boolean init() throws JMSException, FileNotFoundException, IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(1000L); policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(client.sendMessage(msg)) { if(transacted) { System.out.println("Call succeeded - committing message"); session.commit(); } //message.acknowledge(); } else { if(transacted) { System.out.println("Webservice call failed. Rolling back message"); session.rollback(); } } } catch (JMSException e) { e.printStackTrace(); } } } 

Now the message is forwarded every 1000 ms, as indicated in the forwarding policy.

Hope this helps someone else! :)

+6
source share
1 answer

You do not need to use transactions, CLIENT_ACK / Session.recover () will also work ...

Messages are sent to the client when one of the following events occurs:

  • A transactional session is used and rollback () is called.
  • The transactional session is closed before calling commit.
  • The session uses CLIENT_ACKNOWLEDGE and is called by Session.recover ().

see http://activemq.apache.org/message-redelivery-and-dlq-handling.html

+3
source

Source: https://habr.com/ru/post/904159/


All Articles