Am I implementing ActiveMQ correctly? To complete the transaction and try again

Im trying to come up with a JMS-ActiveMQ implementation that supports rollback using a transactional session.
Im very new to ActiveMQ and Ive taken the first hit in the implementation using their Java libraries.

When I launch my application, I see that the messages have been successfully queued and deleted. I also see that the corresponding DLQ is automatically generated. However, I'm not sure if I configured redeliverypolicy correctly. At the moment, it is configured for the manufacturer, but some examples bind the forwarding policy to the listener container, so I'm not quite sure if the poisoned messages will be placed in DLQ, in my case (if at all). Detailed comments are in fragments.

In addition, all the examples I've come across so far use Spring. However, I dont have the opportunity to use it, you will need to reinstall the entire project (Im open if it includes only minimal overhead).

Any insight into how I can do this in Java using the ActiveMQ apis would be greatly appreciated.

Manufacturer

public void publishUpdate(final MessageBody payload)
            throws JMSException {
        Session session = session(connection());
        try {
            Message message = message(session, payload);
            LOGGER.info("About to put message on queue");
            producer(session).send(message);
            // without session.commit()-- no messages get put on the queue.
            session.commit();// messages seem to be enqueued now.
            
        } catch ( BadRequestException e) { //to avoid badly formed requests?
            LOGGER.info("Badly formed request. Not attempting retry!");
            return;
        } catch (JMSException jmsExcpetion) {

            LOGGER.info("Caught JMSException will retry");
            session.rollback();// assume rollback is followed by a retry?
        }         
    }

  private MessageProducer producer(Session session) throws JMSException {
        return session.createProducer(destination());
    }

   private Connection connection() throws JMSException {
        ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory();
        Connection connection = connectionFactory.createConnection();
     connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());//redelivery policy with three retries and redelivery time of 1000ms  
        return connection;
    }

 private Session session(Connection connection) throws JMSException {
        Session session = connection.createSession(true,
                Session.SESSION_TRANSACTED);
        connection.start();
        return session;
    } 

LISTENER:

public class UpdateMessageListener implements MessageListener{
….
    public void onMessage(Message message) {
        String json = null;
        try {
            //Does the listener need to do anything to deal with retry?
            json = ((TextMessage) message).getText();
            MessageBody request = SerializeUtils.deserialize(json, MessageBody.class);
            processTransaction(request.getUpdateMessageBody(), headers);//perform some additional processing.
            } catch (Throwable e) {
            LOGGER.error("Error processing request: {}", json);
        }
    }
}

Consumer:

  private MessageConsumer consumer() throws JMSException {
        LOGGER.info("Creating consumer");
            MessageConsumer consumer = session().createConsumer(destination());
            consumer.setMessageListener(new UpdateMessageListener()); //wire listener to consumer
        return consumer;
    }
    private Session session() throws JMSException {
        Connection connection=connection();
         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//create an auto-ack  from the consumer side? Is this correct?
         connection.start();
         return session;
  }

I am also ready to provide additional code if necessary.

+4
source share
2 answers

Your solution has a slight flaw.

According to the JMS document , a failed message will be re-allocated to the message queue (for example, ActiveMQ) in Session.AUTO_ACKNOWLEDGE mode if it exists excepiton in the onMessage function. But this thread was broken because the listener grabs a Throwable or Exception in the onMessage function. The data stream is shown below: enter image description here

ACK_TYPE

, , . pesudo :

Session session = connection.getSession(consumerId);  
sessionQueueBuffer.enqueue(message);  
Runnable runnable = new Ruannale(){  
    run(){  
        Consumer consumer = session.getConsumer(consumerId);  
        Message md = sessionQueueBuffer.dequeue();  
        try{  
            consumer.messageListener.onMessage(md);  
            ack(md);//send an STANDARD_ACK_TYPE, tell broker success
        }catch(Exception e){  
            redelivery();//send redelivery ack. DELIVERED_ACK_TYPE, require broker keep message and redeliver it.
    }  
}   
threadPool.execute(runnable);
0
  • . , .

  • MessageListener b/c, . receive(). .. ActiveMQSession, INDIVIDUAL_MESSAGE .

0

Source: https://habr.com/ru/post/1612638/


All Articles