MassTransit Losing Messages - Rabbit MQ - When the publisher and consumer end user names match,

We are faced with a situation where MassTransit loses messages if you create a publisher and a consumer using the same endpoint name.

Pay attention to the code below; if I use a different endpoint name for the user or publisher (for example, "rabbitmq: // localhost / mtlossPublised" for the publisher), then the message counts both the published and used matches; if I use the same endpoint name (as in the example), then I get fewer messages than published ones.

Is this the expected behavior? or am I doing something wrong, example working example below.

using MassTransit; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MTMessageLoss { class Program { static void Main(string[] args) { var consumerBus = ServiceBusFactory.New(b => { b.UseRabbitMq(); b.UseRabbitMqRouting(); b.ReceiveFrom("rabbitmq://localhost/mtloss"); }); var publisherBus = ServiceBusFactory.New(b => { b.UseRabbitMq(); b.UseRabbitMqRouting(); b.ReceiveFrom("rabbitmq://localhost/mtloss"); }); consumerBus.SubscribeConsumer(() => new MessageConsumer()); for (int i = 0; i < 10; i++) publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) }); Console.WriteLine("Press ENTER Key to see how many you consumed"); Console.ReadLine(); Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count); Console.ReadLine(); consumerBus.Dispose(); publisherBus.Dispose(); } } public interface ISimpleMessage : CorrelatedBy<Guid> { string Message { get; } } public class SimpleMessage : ISimpleMessage { public Guid CorrelationId { get; set; } public string Message { get; set; } } public class MessageConsumer : Consumes<ISimpleMessage>.All { public static int Count = 0; public void Consume(ISimpleMessage message) { System.Threading.Interlocked.Increment(ref Count); } } } 
+4
source share
2 answers

Bottom line, each bus instance needs its own read queue. Even if the bus exists only for the publication of messages. This is just a requirement for how MassTransit works.

http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - see warning.

We leave the behavior as undefined when two bus instances have the same queue. No matter what we do not support. Each bus instance can send metadata to other bus instances and requires its own endpoint. It was a much more serious agreement with MSMQ, so maybe we could get this case to work on RabbitMQ, but that's not what we were thinking a lot about at that moment.

+4
source

What happens is that when you provide the same Uri receiver, you say that MT loads the balance consumption on two buses, however you only have one bus that listens for messages.

If you receive it in order to keep track of which messages will be received, you will see it (almost) every second.

By changing your sample code, I get

 We consumed 6 simple messages. Press Enter to terminate the applicaion. Received 0 Received 3 Received 5 Received 6 Received 7 Received 8 

Run the user on another bus and you will get them all

 We consumed 10 simple messages. Press Enter to terminate the applicaion. Received 0 Received 1 Received 2 Received 3 Received 4 Received 5 Received 6 Received 7 Received 8 Received 9 

So yes, I would say that this is the expected behavior.

Here's a modified sample code with two subscribers

 using MassTransit; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MTMessageLoss { class Program { internal static bool[] msgReceived = new bool[10]; static void Main(string[] args) { var consumerBus = ServiceBusFactory.New(b => { b.UseRabbitMq(); b.UseRabbitMqRouting(); b.ReceiveFrom("rabbitmq://localhost/mtloss"); }); var publisherBus = ServiceBusFactory.New(b => { b.UseRabbitMq(); b.UseRabbitMqRouting(); b.ReceiveFrom("rabbitmq://localhost/mtloss"); }); publisherBus.SubscribeConsumer(() => new MessageConsumer()); consumerBus.SubscribeConsumer(() => new MessageConsumer()); for (int i = 0; i < 10; i++) consumerBus.Publish(new SimpleMessage() {CorrelationId = Guid.NewGuid(), MsgId = i}); Console.WriteLine("Press ENTER Key to see how many you consumed"); Console.ReadLine(); Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count); for (int i = 0; i < 10; i++) if (msgReceived[i]) Console.WriteLine("Received {0}", i); Console.ReadLine(); consumerBus.Dispose(); publisherBus.Dispose(); } } public interface ISimpleMessage : CorrelatedBy<Guid> { int MsgId { get; } } public class SimpleMessage : ISimpleMessage { public Guid CorrelationId { get; set; } public int MsgId { get; set; } } public class MessageConsumer : Consumes<ISimpleMessage>.All { public static int Count = 0; public void Consume(ISimpleMessage message) { Program.msgReceived[message.MsgId] = true; System.Threading.Interlocked.Increment(ref Count); } } } 
+1
source

Source: https://habr.com/ru/post/1434590/


All Articles