Reading from multiple queues, RabbitMQ

I am new to RabbitMQ. I want to be able to handle reading messages without blocking when there are several queues (for reading). Any data on how I can do this?

// Edit 1

public class Rabbit : IMessageBus { private List<string> publishQ = new List<string>(); private List<string> subscribeQ = new List<string>(); ConnectionFactory factory = null; IConnection connection = null; IModel channel = null; Subscription sub = null; public void writeMessage( Measurement m1 ) { byte[] body = Measurement.AltSerialize( m1 ); int msgCount = 1; Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id); string finalQueue = publishToQueue( m1.id ); while (msgCount --> 0) { channel.BasicPublish("amq.direct", finalQueue, null, body); } Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id); } public string publishToQueue(string firstQueueName) { Console.WriteLine("Creating a queue and binding it to amq.direct"); string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null); channel.QueueBind(queueName, "amq.direct", queueName, null); Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName); return queueName; } public Measurement readMessage() { Console.WriteLine("Receiving message..."); Measurement m = new Measurement(); int i = 0; foreach (BasicDeliverEventArgs ev in sub) { m = Measurement.AltDeSerialize(ev.Body); //m.id = //get the id here, from sub if (++i == 1) break; sub.Ack(); } Console.WriteLine("Done.\n"); return m; } public void subscribeToQueue(string queueName ) { sub = new Subscription(channel, queueName); } public static string MsgSysName; public string MsgSys { get { return MsgSysName; } set { MsgSysName = value; } } public Rabbit(string _msgSys) //Constructor { factory = new ConnectionFactory(); factory.HostName = "localhost"; connection = factory.CreateConnection(); channel = connection.CreateModel(); //consumer = new QueueingBasicConsumer(channel); System.Console.WriteLine("\nMsgSys: RabbitMQ"); MsgSys = _msgSys; } ~Rabbit() { //observer?? connection.Dispose(); //channel.Dispose(); System.Console.WriteLine("\nDestroying RABBIT"); } } 

// Edit 2

 private List<Subscription> subscriptions = new List<Subscription>(); Subscription sub = null; public Measurement readMessage() { Measurement m = new Measurement(); foreach(Subscription element in subscriptions) { foreach (BasicDeliverEventArgs ev in element) { //ev = element.Next(); if( ev != null) { m = Measurement.AltDeSerialize( ev.Body ); return m; } m = null; } } System.Console.WriteLine("No message in the queue(s) at this time."); return m; } public void subscribeToQueue(string queueName) { sub = new Subscription(channel, queueName); subscriptions.Add(sub); } 

// Edit 3

 //MessageHandler.cs public class MessageHandler { // Implementation of methods for Rabbit class go here private List<string> publishQ = new List<string>(); private List<string> subscribeQ = new List<string>(); ConnectionFactory factory = null; IConnection connection = null; IModel channel = null; QueueingBasicConsumer consumer = null; private List<Subscription> subscriptions = new List<Subscription>(); Subscription sub = null; public void writeMessage ( Measurement m1 ) { byte[] body = Measurement.AltSerialize( m1 ); //declare a queue if it doesn't exist publishToQueue(m1.id); channel.BasicPublish("amq.direct", m1.id, null, body); Console.WriteLine("\n [x] Sent to queue {0}.", m1.id); } public void publishToQueue(string queueName) { string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null); channel.QueueBind(finalQueueName, "amq.direct", "", null); } public Measurement readMessage() { Measurement m = new Measurement(); foreach(Subscription element in subscriptions) { if( element.QueueName == null) { m = null; } else { BasicDeliverEventArgs ev = element.Next(); if( ev != null) { m = Measurement.AltDeSerialize( ev.Body ); m.id = element.QueueName; element.Ack(); return m; } m = null; } element.Ack(); } System.Console.WriteLine("No message in the queue(s) at this time."); return m; } public void subscribeToQueue(string queueName) { sub = new Subscription(channel, queueName); subscriptions.Add(sub); } public static string MsgSysName; public string MsgSys { get { return MsgSysName; } set { MsgSysName = value; } } public MessageHandler(string _msgSys) //Constructor { factory = new ConnectionFactory(); factory.HostName = "localhost"; connection = factory.CreateConnection(); channel = connection.CreateModel(); consumer = new QueueingBasicConsumer(channel); System.Console.WriteLine("\nMsgSys: RabbitMQ"); MsgSys = _msgSys; } public void disposeAll() { connection.Dispose(); channel.Dispose(); foreach(Subscription element in subscriptions) { element.Close(); } System.Console.WriteLine("\nDestroying RABBIT"); } } 

//App1.cs

 using System; using System.IO; using UtilityMeasurement; using UtilityMessageBus; public class MainClass { public static void Main() { MessageHandler obj1 = MessageHandler("Rabbit"); System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); //Create new Measurement messages Measurement m1 = new Measurement("q1", 2345, 23.456); Measurement m2 = new Measurement("q2", 222, 33.33); System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id); System.Console.WriteLine(" Time: {0}", m1.time); System.Console.WriteLine(" Value: {0}", m1.value); System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id); System.Console.WriteLine(" Time: {0}", m2.time); System.Console.WriteLine(" Value: {0}", m2.value); // Ask queue name and store it System.Console.WriteLine("\nName of queue to publish to: "); string queueName = (System.Console.ReadLine()).ToString(); obj1.publishToQueue( queueName ); // Write message to the queue obj1.writeMessage( m1 ); System.Console.WriteLine("\nName of queue to publish to: "); string queueName2 = (System.Console.ReadLine()).ToString(); obj1.publishToQueue( queueName2 ); obj1.writeMessage( m2 ); obj1.disposeAll(); } } 

//App2.cs

 using System; using System.IO; using UtilityMeasurement; using UtilityMessageBus; public class MainClass { public static void Main() { //Asks for the message system System.Console.WriteLine("\nEnter name of messageing system: "); System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); string MsgSysName = (System.Console.ReadLine()).ToString(); //Declare an IMessageBus instance: //Here, an object of the corresponding Message System // (ex. Rabbit, Zmq, etc) is instantiated IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); //Create a new Measurement object m Measurement m = new Measurement(); System.Console.WriteLine("Queue name to subscribe to: "); string QueueName1 = (System.Console.ReadLine()).ToString(); obj1.subscribeToQueue( QueueName1 ); //Read message into m m = obj1.readMessage(); if (m != null ) { System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); System.Console.WriteLine(" Time: {0}", m.time); System.Console.WriteLine(" Value: {0}", m.value); } System.Console.WriteLine("Another queue name to subscribe to: "); string QueueName2 = (System.Console.ReadLine()).ToString(); obj1.subscribeToQueue( QueueName2 ); m = obj1.readMessage(); if (m != null ) { System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); System.Console.WriteLine(" Time: {0}", m.time); System.Console.WriteLine(" Value: {0}", m.value); } obj1.disposeAll(); } } 
+6
source share
2 answers

two sources of information:

Useful operations for understanding:

  • Announce / Approve / Listen / Sign / Publish

Re: your question is there is no reason why you cannot have multiple listeners. Or you can subscribe to n routing routes with one listener per โ€œexchangeโ€.

** re: non-blocking **

A typical listener consumes messages one at a time. You can remove them from the queue, or they will automatically be placed next to the consumer in the "window" mode (determined using the qos quality of service parameters). The beauty of the approach is that a lot of hard work is being done for you (re: reliability, guaranteed delivery, etc.).

A key feature of RabbitMQ is that when an error occurs in processing, the message is again added to the queue (fault tolerance function).

You need to know more about your situation.

Often, if you publish the list mentioned above, you can get one of RabbitMQ employees. They are very helpful.

Hope this helps. It is very important to plunge into the head first, but it is worth continuing.


Q & A

see http://www.rabbitmq.com/faq.html

Q. Can you subscribe to multiple queues using the new subscription (channel, queueName)?

Yes. You use a binding key, for example. abc. *. hij or abc. #. hij, or you attach some bindings. The first assumes that you have developed routing keys around a principle that makes sense to you (see Routing Keys in the FAQ). For the latter, you need to bind to multiple queues.

Implementing n-bindings manually. see http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

there is not much code for this template, so you can collapse your own subscription template if there are not enough wildcards. you can inherit this class and add another method for additional bindings ... maybe this will work or something close to this (unchecked).

The AQMP specification says that multiple manual binding is possible: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

Q. And if so, how can I go through all the signed queues and return a message (null when there are no messages)?

With a subscriber, you receive a notification when a message is available. Otherwise, what you are describing is the pull interface, where you pull the message on demand. If there are no messages, you will get zero as you wish. By the way: the Notify method is probably more convenient.

Q. Oh, and remember that I have all these operations in different methods. I will edit my post to reflect the code

Live Code:

this version should use wild cards to subscribe to multiple routing keys

n manual routing keys using a subscription remain as an exercise for the reader. ;-) I think you were inclined towards the push interface anyway. btw: pull interfaces are less efficient than notifications.

  using (Subscription sub = new Subscription(ch, QueueNme)) { foreach (BasicDeliverEventArgs ev in sub) { Process(ev.Body); ... 

Note: foreach uses IEnumerable, and IEnumerable wraps the event when a new message arrives through the yield statement. Effectively this is an infinite loop.

--- UPDATE

AMQP was designed with the fact that the number of TCP connections does not exceed the number of applications, so you can have many channels for each connection.

the code in this question (edit 3) is trying to use two subscribers with one channel, while it should (I believe) be one subscriber per channel in the stream to avoid blocking problems. Sugestion: Use a wildcard routing key. You can subscribe to several queue names with a java client, but the .net client, as far as I know, has not implemented this in the subscriber assistant class.

If you really need two different queue names in the same subscription stream, then for .net: {/ p>

  using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO conn.AutoClose = true; // no reason to closs the connection either. Here for completeness. ch.QueueDeclare(queueName); BasicGetResult result = ch.BasicGet(queueName, false); if (result == null) { Console.WriteLine("No message available."); } else { ch.BasicAck(result.DeliveryTag, false); Console.WriteLine("Message:"); } return 0; } 

- UPDATE 2:

from the RabbitMQ list:

"suppose element.Next () blocks one of the signatures. You can receive deliveries from each subscription with a timeout before read it. Alternatively, you can configure one queue to receive all dimensions and receive messages from it with one subscription " (Emile)

This means that when the first queue is empty, .Next () blocks waiting for the next message to appear. that is, the subscriber has a built-in wait-for-next message.

- UPDATE 3:

in .net, use QueueingBasicConsumer to consume from multiple queues.

Actually there is a thread about this to get an idea of โ€‹โ€‹usage:

Wait for one RabbitMQ message with timeout

- UPDATE4:

Learn more about .QueueingBasicConsumer

Here is a sample code here.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

The example is copied in response with several modifications (see // <-----).

  IModel channel = ...; QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, false, null, consumer); //<----- channel.BasicConsume(queueName2, false, null, consumer); //<----- // etc. channel.BasicConsume(queueNameN, false, null, consumer); //<----- // At this point, messages will be being asynchronously delivered, // and will be queueing up in consumer.Queue. while (true) { try { BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue(); // ... handle the delivery ... channel.BasicAck(e.DeliveryTag, false); } catch (EndOfStreamException ex) { // The consumer was cancelled, the model closed, or the // connection went away. break; } } 

- UPDATE 5: a simple get that will work in any queue (a slower, but sometimes more convenient method).

  ch.QueueDeclare(queueName); BasicGetResult result = ch.BasicGet(queueName, false); if (result == null) { Console.WriteLine("No message available."); } else { ch.BasicAck(result.DeliveryTag, false); Console.WriteLine("Message:"); // deserialize body and display extra info here. } 
+12
source

The easiest way is to use EventingBasicConsumer. I have an example on my site on how to use it. RabbitMQ EventingBasicConsumer

This Consumer class provides an Received event that you can use, and therefore DOES NOT block. The rest of the code basically remains the same.

+1
source

Source: https://habr.com/ru/post/892770/


All Articles