We use Rebus as a queuing system with an Sql server. We have several recipients for different types of messages. Each message can be processed by several employees of a certain type. One message should be processed / processed only by one employee (the first who pulls it). If an employee cannot complete it for some reason, he will postpone the message using the timeout service.
If I understand correctly, it becomes a TimeoutRequest and places it in a timeout table. When it is time to restart, it becomes a TimeoutReply before it is re-queued as the original message.
The problem we are facing is that when it becomes TimeoutReply, all workers pick it up and create the original message. One original message becomes several messages (as many as there are workers) when the timeout expires.
Our Rebus setup is as follows:
"Server side":
var adapter = new BuiltinContainerAdapter();
Configure.With(adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
.CreateBus()
.Start();
return adapter;
"Worker's side":
_adapter = new BuiltinContainerAdapter();
Configure.With(_adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
.EnsureTableIsCreated())
.Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
.Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
.Behavior(x => x.SetMaxRetriesFor<Exception>(0))
.Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
.CreateBus().Start(numberOfWorkers);
Any help in solving a problem or providing understanding is greatly appreciated!
source
share