After Google for a few days, and I believe that I completely lost. I would like to implement a kind of priority queue that has about 3 queues:
- a high priority queue (daily) that should be the first.
- medium priority queue (weekly), which will be processed if there are no elements 1 in the queue (this is a normal message in this queue, it is never processed at all)
- low priority queue (monthly), which will be processed if there are no # 1 and # 2 in the queue (this is a normal message in this queue, it is never processed at all)
Initially, I have the following thread, so that the consumer consumes messages from all three queues and checks to see if there are any elements in queues # 1, # 2 and # 3. and then I understand that this is wrong, because:
- I completely lost the question: "How do I know in which line it goes?".
- I already consume the message regardless of any queue. Therefore, if I get an object from a queue with a lower priority, will I go back to the queue if I find out that there is a message in the queue with a higher priority?
Below are my current configurations, which show that I'm an idiot.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="localhost" /> <rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory" exchange="" routing-key="daily_queue"/> <rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory" exchange="" routing-key="weekly_queue"/> <rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory" exchange="" routing-key="monthly_queue"/> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" /> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" /> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" /> </rabbit:listener-container> <bean id="Consumer" class="com.test.Consumer" /> </beans>
Any idea how I can solve this priority queue problem?
ps: I also wonder if Apache Camel has something I can rely on?
UPDATE 1: I just saw this on Apache Camel: " https://issues.apache.org/jira/browse/CAMEL-2537 " JMSPriority sequencer seems to be looking for what it is looking for, has anyone tried this before?
UPDATE 2: assuming that I am using the RabbitMQ plugin base in @Gary Russell's recommendation, I have the following spring -rabbitmq XML context configuration that seems to make sense (as a guest ..):
<rabbit:queue name="ad_google_dfa_reporting_queue"> <rabbit:queue-arguments> <entry key="x-max-priority" value="10"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" /> </rabbit:listener-container> <bean id="Consumer" class="com.test.Consumer" />
In the above xml configuration, a queue was successfully created with the name: "ad_google_dfa_reporting_queue" and with parameter arguments: x-max-priority: 10 and durable: true
But not when the code arrives sending a message with priority, I completely lost it. How to determine priority as a reference in the example URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java
AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting"); amqpTemplateGoogleDfaReporting.convertAndSend("message");
UPDATE 3: based on @Gary's answer, I can send a message with the priority set in the message, as shown below:
However, when I sent 1000 messages with a random priority between 1-10, the consumer consumes a message with all priorities. (I expected the first high priority message to be consumed first). Below is the code for the message producer:
Random random = new Random(); for (int i=0; i< 1000; i++){ final int priority = random.nextInt(10 - 1 + 1) + 1; DfaReportingModel model = new DfaReportingModel(); model.setReportType(DfaReportingModel.ReportType.FACT); model.setUserProfileId(0l + priority); amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(priority); return message; } }); }
And the following code for the message consumer:
public void consume(DfaReportingModel message) { System.out.println(message.getUserProfileId()); Thread.sleep(500); }
Result im gets:
9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,
UPDATE 4: Problem Solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue works in my environment, I believe the problem is related to the spring context. Therefore, after countless attempts and errors with various types of configurations, I indicate the exact combination that will make it work! and corresponds to the following:
<rabbit:queue name="ad_google_dfa_reporting_queue"> <rabbit:queue-arguments> <entry key="x-max-priority"> <value type="java.lang.Integer">10</value> </entry> </rabbit:queue-arguments> </rabbit:queue>
Without a special definition, the value is of type Integer; the priority queue does not work. Finally, it is resolved. Hooray!