I worked on getting long messages working with NServiceBus on an Azure transport. Based on this document , I thought I could leave by firing the lengthy process in a separate thread, marking the task of the event handler as complete and then listening to the user-defined OperationStarted or OperationComplete events. I noticed that the OperationComplete event is no longer accepted by my handlers. In fact, the only time it is received is when I publish it immediately after the publication of the OperationStarted event. Any actual processing between them somehow prevents the completion event from being received. Here is my code:
Abstract class used for long messages
public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class { protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>(); public Task Handle(TMessage message, IMessageHandlerContext context) { var opStarted = new OperationStarted { OperationID = Guid.NewGuid(), OperationType = typeof(TMessage).FullName }; var errors = new List<string>();
Testing
public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand> { protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context) {
Operational events
public sealed class OperationComplete : IEvent { public Guid OperationID { get; set; } public string OperationType { get; set; } public bool Success => !Errors?.Any() ?? true; public List<string> Errors { get; set; } = new List<string>(); public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow; } public sealed class OperationStarted : IEvent { public Guid OperationID { get; set; } public string OperationType { get; set; } public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow; }
Handlers
public class OperationHandler : IHandleMessages<OperationStarted> , IHandleMessages<OperationComplete> { static ILog logger = LogManager.GetLogger<OperationHandler>(); public Task Handle(OperationStarted message, IMessageHandlerContext context) { return PrintJsonMessage(message); } public Task Handle(OperationComplete message, IMessageHandlerContext context) {
I am sure that calls to context.Publish() fall because calls to _logger.Info() print messages to the test console. I also confirmed that they hit a breakpoint. In my testing, anything that runs for more than 500 milliseconds prevents the processing of the OperationComplete event.
If someone can offer suggestions on why the OperationComplete event does not hit the handler when some significant amount of time has passed in the ProcessMessage implementation, I would be extremely grateful for listening to them. Thanks!
- Update - In case someone else comes across this and is interested in what I ended up with:
After exchanging with NServiceBus developers, I decided to use a watch-saga that implemented the IHandleTimeouts interface to periodically check the progress of the task.I used the saga data updated after the task was completed to determine whether the OperationComplete event should be disabled in the timeout handler. This presented another problem: when using In-Memory Persistence, the saga data were not stored in the threads, even when they were blocked by each thread. To get around this, I created an interface specifically designed for long-term work, storing data in memory. This interface was introduced into the saga as a singleton and was thus used to read / write saga data over streams for lengthy operations.
I know that storing in memory is not recommended, but for my needs, setting up another type of persistence (like Azure tables) was excessive; I just want the OperationComplete event to fire under normal circumstances. If a reboot is performed during operation, I do not need to save the saga data. In any case, the task will be shortened, and the saga timeout will handle the OperationComplete event with an error if the task runs longer than the set maximum time.