MQ Rabbit - Reconnect / Channel / Consumer

I am creating a user that runs in an infinite loop to read messages from a queue. I am looking for advice / example code on how to restore abd continue in my infinite loop, even if there are network failures. The consumer must remain on as it will be installed as a WindowsService.

1) Can someone explain how to use these settings correctly? What is the difference between the two?

NetworkRecoveryInterval 
AutomaticRecoveryEnabled
RequestedHeartbeat

2) Please see my current sample code for the consumer. I am using .Net RabbitMQ Client v3.5.6.

How will the above settings for "recovery" be performed? for example, will user.Queue.Dequeue block until it is restored? It doesn't seem right so ...

Do I need to enter the code for this manually? for example, will user.Queue.Dequeue throw an exception for which I have to detect and manually recreate my connection, channel and consumer? Or just a consumer, since AutomaticRecovery will restore the channel for me?

Does this mean that I should move the consumer creation inside the while loop? how about creating a channel? and making a connection?

3) Assuming I have to execute some of this recovery code manually, are there any event callbacks (and how do I register for them) to let me know that there are network problems?

Thanks!

public void StartConsumer(string queue)
{
            using (IModel channel = this.Connection.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);
                const bool noAck = false;
                channel.BasicConsume(queue, noAck, consumer);

                // do I need these conditions? or should I just do while(true)???
                while (channel.IsOpen &&        
                       Connection.IsOpen &&     
                       consumer.IsRunning)
                {
                    try
                    {
                        BasicDeliverEventArgs item;
                        if (consumer.Queue.Dequeue(Timeout, out item))
                        {
                            string message = System.Text.Encoding.UTF8.GetString(item.Body);
                            DoSomethingMethod(message);
                            channel.BasicAck(item.DeliveryTag, false);
                        }
                    }
                    catch (EndOfStreamException ex)
                    {   
                        // this is likely due to some connection issue -- what am I to do?
                    }
                    catch (Exception ex)
                    {   
                        // should never happen, but lets say my DoSomethingMethod(message); throws an exception
                        // presumably, I'll just log the error and keep on going
                    }
                }
            }
}

        public IConnection Connection
        {
            get
            {
                if (_connection == null) // _connection defined in class -- private static IConnection _connection;
                {
                     _connection = CreateConnection();
                }
                return _connection;
            }
        }

        private IConnection CreateConnection()
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "RabbitMqHostName",
                UserName = "RabbitMqUserName",
                Password = "RabbitMqPassword",
            };

            // why do we need to set this explicitly? shouldn't this be the default?
            factory.AutomaticRecoveryEnabled = true;

            // what is a good value to use?
            factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(5); 

            // what is a good value to use? How is this different from NetworkRecoveryInterval?
            factory.RequestedHeartbeat = 5; 

            IConnection connection = factory.CreateConnection();
            return connection;
        }
+4
source share
1 answer

Features of RabbitMq

RabbitMq . , , , . ( ) :

  • .
  • .
  • basic.qos,

NetworkRecoveryInterval - , ( - 5 .)

Heartbeat - TCP-. , RabbitMq.

. EndOfStreamException ( ), , - . , , , . , . RabbitMq, . , , : .

RawRabbit, . , - ? , QueueingBasicConsumer EventingBasicConsumer. , .

var eventConsumer = new EventingBasicConsumer(channel);
eventConsumer.Received += (sender, args) =>
{
    var body = args.Body;
    eventConsumer.Model.BasicAck(args.DeliveryTag, false);
};
channel.BasicConsume(queue, false, eventConsumer);

, RabbitMq . ConsumerCancelled Shutdown Registered, , .

+11

Source: https://habr.com/ru/post/1615810/


All Articles