Spring AMQP RabbitMQ implementing priority queue

After Google for a few days, and I believe that I completely lost. I would like to implement a kind of priority queue that has about 3 queues:

  • a high priority queue (daily) that should be the first.
  • medium priority queue (weekly), which will be processed if there are no elements 1 in the queue (this is a normal message in this queue, it is never processed at all)
  • low priority queue (monthly), which will be processed if there are no # 1 and # 2 in the queue (this is a normal message in this queue, it is never processed at all)

Initially, I have the following thread, so that the consumer consumes messages from all three queues and checks to see if there are any elements in queues # 1, # 2 and # 3. and then I understand that this is wrong, because:

  • I completely lost the question: "How do I know in which line it goes?".
  • I already consume the message regardless of any queue. Therefore, if I get an object from a queue with a lower priority, will I go back to the queue if I find out that there is a message in the queue with a higher priority?

Below are my current configurations, which show that I'm an idiot.

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="localhost" /> <rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory" exchange="" routing-key="daily_queue"/> <rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory" exchange="" routing-key="weekly_queue"/> <rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory" exchange="" routing-key="monthly_queue"/> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" /> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" /> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" /> </rabbit:listener-container> <bean id="Consumer" class="com.test.Consumer" /> </beans> 

Any idea how I can solve this priority queue problem?

ps: I also wonder if Apache Camel has something I can rely on?

UPDATE 1: I just saw this on Apache Camel: " https://issues.apache.org/jira/browse/CAMEL-2537 " JMSPriority sequencer seems to be looking for what it is looking for, has anyone tried this before?

UPDATE 2: assuming that I am using the RabbitMQ plugin base in @Gary Russell's recommendation, I have the following spring -rabbitmq XML context configuration that seems to make sense (as a guest ..):

 <rabbit:queue name="ad_google_dfa_reporting_queue"> <rabbit:queue-arguments> <entry key="x-max-priority" value="10"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" /> </rabbit:listener-container> <bean id="Consumer" class="com.test.Consumer" /> 

In the above xml configuration, a queue was successfully created with the name: "ad_google_dfa_reporting_queue" and with parameter arguments: x-max-priority: 10 and durable: true

But not when the code arrives sending a message with priority, I completely lost it. How to determine priority as a reference in the example URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

 AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting"); amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority? 

UPDATE 3: based on @Gary's answer, I can send a message with the priority set in the message, as shown below: message priority screenshot However, when I sent 1000 messages with a random priority between 1-10, the consumer consumes a message with all priorities. (I expected the first high priority message to be consumed first). Below is the code for the message producer:

  Random random = new Random(); for (int i=0; i< 1000; i++){ final int priority = random.nextInt(10 - 1 + 1) + 1; DfaReportingModel model = new DfaReportingModel(); model.setReportType(DfaReportingModel.ReportType.FACT); model.setUserProfileId(0l + priority); amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(priority); return message; } }); } 

And the following code for the message consumer:

  public void consume(DfaReportingModel message) { System.out.println(message.getUserProfileId()); Thread.sleep(500); } 

Result im gets:

 9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1, 

UPDATE 4: Problem Solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue works in my environment, I believe the problem is related to the spring context. Therefore, after countless attempts and errors with various types of configurations, I indicate the exact combination that will make it work! and corresponds to the following:

  <rabbit:queue name="ad_google_dfa_reporting_queue"> <rabbit:queue-arguments> <entry key="x-max-priority"> <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work --> </entry> </rabbit:queue-arguments> </rabbit:queue> 

Without a special definition, the value is of type Integer; the priority queue does not work. Finally, it is resolved. Hooray!

+6
source share
2 answers

RabbitMQ now has a priority queue plugin where messages are delivered in priority order. It would be better to use this rather than your low priority message request scheme, which will be quite expensive at runtime.

EDIT:

When using the rabbitTemplate.convertAndSend(...) methods and you want to set the priority property in the message, you need to either implement the custom MessagePropertiesConverter in the template (subclass of DefaultMessagePropertiesConverter ) or use the convertAnSend options that accept the post-processor message; eg:.

 template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(5); return message; } }); 
+2
source

RabbitMQ has a priority queue in the kernel since version 3.5.0.

You can declare priority queues using the x-max-priority argument. This argument must be an integer indicating the maximum priority that the queue should support. For example, using the Java client:

 Channel ch = ...; Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-priority", 10); ch.queueDeclare("my-priority-queue", true, false, false, args); 

You can then publish priority messages using the priority field basic.properties . Larger numbers indicate higher priority.

0
source

Source: https://habr.com/ru/post/977967/


All Articles