Can you programmatically change the dead letter processing of a queue in the built-in Java broker?

Background

At a high level, I have a Java application in which certain events must trigger a specific action for the current user. However, events can be very frequent, and the action is always the same. Therefore, when the first event occurs, I would like to plan an action at some point in the near future (for example, 5 minutes). During this period of time, subsequent events should not take any action because the application sees that an action has already been scheduled. After completing the planned action, we will return to step 1, and the next event will start the cycle again.

My idea is to implement this filtering and throttling mechanism by embedding an ActiveMQ instance in memory within the application itself (I don't care about saving the queue).

I believe that JMS 2.0 supports this concept of delivery delay, with the delay of messages sitting in the "queue queue" until the time of delivery to the real destination. However, I also believe that ActiveMQ does not yet support the JMS 2.0 specification ... so I am thinking of simulating the same behavior using lifetime values โ€‹โ€‹(TTL) and message queue processing (DLQ).

Basically, my message producer code puts messages in a fictitious intermediate queue from which consumers never pull anything . Messages will be posted with a 5-minute TTL value, and upon expiration ActiveMQ will display them in DLQ. This is the queue from which users of my message will actually consume messages.

Question

I donโ€™t think I want to actually consume from DLQ by default, because I have no idea what other internal ActiveMQ things can dump there that are completely unrelated to my application code. Therefore, I think it would be better if my dummy intermediate queue had its own DLQ. I saw only one page of ActiveMQ documentation that discusses DLQ configuration , and it only processes XML configuration files for a standalone ActiveMQ installation (and not the -memory broker built into the application).

Is it possible to programmatically configure a custom DLQ at run time for a queue in an embedded ActiveMQ instance?

I will also be interested to hear alternative suggestions if you think I'm wrong. I am more familiar with JMS than AMQP, so I donโ€™t know if it has become much easier with Qpid or any other AMQP browser implemented in Java. Whatever Apache Camel (!) Is, I believe that it should succeed in such things, but this learning curve may be gross redundant for this use case.

+6
source share
3 answers

Although you are concerned that Camel might be a gross overkill for this use, I think ActiveMQ is already a gross overkill for the Usecase you described.

You plan for something to happen 5 minutes after the event happened, and that it consumes only the first event and ignores all those that were between the first, and when 5 minutes were up, right? Why not just plan your processing method for 5 minutes using the ScheduledExecutorService or your favorite scheduling engine and save the event in the HashMap<User, Event> member variable. If there are more events for this user before the processing starts, you will simply see that you already have an event and not stored a new one, so you will ignore everything except the first one. At the end of your processing method, delete the event for this user from your HashMap , and the next event that comes in will be saved and scheduled.

Running ActiveMQ just to get this behavior looks a lot bigger than you need. Or, if not, can you explain why?

EDIT:

If you go this route, do not use the TTL message to expire your messages; simply (one and only) the consumer reads them in memory and uses the memory solution described above to process (no more) one batch every 5 minutes. Either have one queue with message selectors, or use dynamic queues, one for each user. You do not need DLQ to implement the delay, and even if you could do this, it will not give you the functionality to batch download everything, so that you only run once every 5 minutes. This is not the way you want to go down, even if you figure out how to do it.

+4
source

A simple solution tracks pending actions in a parallel structure and uses the ScheduledExecutorService to execute them:

 private static final Object RUNNING = new Object(); private final ConcurrentMap<UserId, Object> pendingActions = new ConcurrentHashMap<>(); private ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); public void takeAction(final UserId id) { Object running = pendingActions.putIfAbsent(id, RUNNING); // atomic if(running == null) { // no pending action for this user ses.schedule(new Runnable() { @Override public void run() { doWork(); pendingActions.remove(id); } }, 5, TimeUnit.MINUTES); } } 
+1
source

With Camel, this can be easily achieved using the Aggregator component with the completionInterval parameter, so every five minutes you can check if the aggregated messages in the list are empty, if they do not block the message on the route responsible for the userโ€™s action and drop the list. You need to maintain the entire list of exchanges, just state (user action is planned or not).

+1
source

Source: https://habr.com/ru/post/975735/


All Articles