What happens is that when you provide the same Uri receiver, you say that MT loads the balance consumption on two buses, however you only have one bus that listens for messages.
If you receive it in order to keep track of which messages will be received, you will see it (almost) every second.
By changing your sample code, I get
We consumed 6 simple messages. Press Enter to terminate the applicaion. Received 0 Received 3 Received 5 Received 6 Received 7 Received 8
Run the user on another bus and you will get them all
We consumed 10 simple messages. Press Enter to terminate the applicaion. Received 0 Received 1 Received 2 Received 3 Received 4 Received 5 Received 6 Received 7 Received 8 Received 9
So yes, I would say that this is the expected behavior.
Here's a modified sample code with two subscribers
using MassTransit; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MTMessageLoss { class Program { internal static bool[] msgReceived = new bool[10]; static void Main(string[] args) { var consumerBus = ServiceBusFactory.New(b => { b.UseRabbitMq(); b.UseRabbitMqRouting(); b.ReceiveFrom("rabbitmq://localhost/mtloss"); }); var publisherBus = ServiceBusFactory.New(b => { b.UseRabbitMq(); b.UseRabbitMqRouting(); b.ReceiveFrom("rabbitmq://localhost/mtloss"); }); publisherBus.SubscribeConsumer(() => new MessageConsumer()); consumerBus.SubscribeConsumer(() => new MessageConsumer()); for (int i = 0; i < 10; i++) consumerBus.Publish(new SimpleMessage() {CorrelationId = Guid.NewGuid(), MsgId = i}); Console.WriteLine("Press ENTER Key to see how many you consumed"); Console.ReadLine(); Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count); for (int i = 0; i < 10; i++) if (msgReceived[i]) Console.WriteLine("Received {0}", i); Console.ReadLine(); consumerBus.Dispose(); publisherBus.Dispose(); } } public interface ISimpleMessage : CorrelatedBy<Guid> { int MsgId { get; } } public class SimpleMessage : ISimpleMessage { public Guid CorrelationId { get; set; } public int MsgId { get; set; } } public class MessageConsumer : Consumes<ISimpleMessage>.All { public static int Count = 0; public void Consume(ISimpleMessage message) { Program.msgReceived[message.MsgId] = true; System.Threading.Interlocked.Increment(ref Count); } } }
source share