NServiceBus events are lost when publishing in a separate thread

I worked on getting long messages working with NServiceBus on an Azure transport. Based on this document , I thought I could leave by firing the lengthy process in a separate thread, marking the task of the event handler as complete and then listening to the user-defined OperationStarted or OperationComplete events. I noticed that the OperationComplete event is no longer accepted by my handlers. In fact, the only time it is received is when I publish it immediately after the publication of the OperationStarted event. Any actual processing between them somehow prevents the completion event from being received. Here is my code:

Abstract class used for long messages

public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class { protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>(); public Task Handle(TMessage message, IMessageHandlerContext context) { var opStarted = new OperationStarted { OperationID = Guid.NewGuid(), OperationType = typeof(TMessage).FullName }; var errors = new List<string>(); // Fire off the long running task in a separate thread Task.Run(() => { try { _logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}"); context.Publish(opStarted); ProcessMessage(message, context); } catch (Exception ex) { errors.Add(ex.Message); } finally { var opComplete = new OperationComplete { OperationType = typeof(TMessage).FullName, OperationID = opStarted.OperationID, Errors = errors }; context.Publish(opComplete); _logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}"); } }); return Task.CompletedTask; } protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context); } 

Testing

 public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand> { protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context) { // If I remove this, or lessen it to something like 200 milliseconds, the // OperationComplete event gets handled Thread.Sleep(1000); } } 

Operational events

 public sealed class OperationComplete : IEvent { public Guid OperationID { get; set; } public string OperationType { get; set; } public bool Success => !Errors?.Any() ?? true; public List<string> Errors { get; set; } = new List<string>(); public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow; } public sealed class OperationStarted : IEvent { public Guid OperationID { get; set; } public string OperationType { get; set; } public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow; } 

Handlers

 public class OperationHandler : IHandleMessages<OperationStarted> , IHandleMessages<OperationComplete> { static ILog logger = LogManager.GetLogger<OperationHandler>(); public Task Handle(OperationStarted message, IMessageHandlerContext context) { return PrintJsonMessage(message); } public Task Handle(OperationComplete message, IMessageHandlerContext context) { // This is not hit if ProcessMessage takes too long return PrintJsonMessage(message); } private Task PrintJsonMessage<T>(T message) where T : class { var msgObj = new { Message = typeof(T).Name, Data = message }; logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented)); return Task.CompletedTask; } } 

I am sure that calls to context.Publish() fall because calls to _logger.Info() print messages to the test console. I also confirmed that they hit a breakpoint. In my testing, anything that runs for more than 500 milliseconds prevents the processing of the OperationComplete event.

If someone can offer suggestions on why the OperationComplete event does not hit the handler when some significant amount of time has passed in the ProcessMessage implementation, I would be extremely grateful for listening to them. Thanks!

- Update - In case someone else comes across this and is interested in what I ended up with:

After exchanging with NServiceBus developers, I decided to use a watch-saga that implemented the IHandleTimeouts interface to periodically check the progress of the task.I used the saga data updated after the task was completed to determine whether the OperationComplete event should be disabled in the timeout handler. This presented another problem: when using In-Memory Persistence, the saga data were not stored in the threads, even when they were blocked by each thread. To get around this, I created an interface specifically designed for long-term work, storing data in memory. This interface was introduced into the saga as a singleton and was thus used to read / write saga data over streams for lengthy operations.

I know that storing in memory is not recommended, but for my needs, setting up another type of persistence (like Azure tables) was excessive; I just want the OperationComplete event to fire under normal circumstances. If a reboot is performed during operation, I do not need to save the saga data. In any case, the task will be shortened, and the saga timeout will handle the OperationComplete event with an error if the task runs longer than the set maximum time.

+5
source share
1 answer

The reason for this is that if ProcessMessage is fast enough, you can get the current context before it becomes invalid, for example, to be deleted.

When you return from Handle successfully, you tell NServiceBus: β€œI ended up with this message,” so it can do what it wants with context , for example, as invalid. In the background processor, you need an endpoint instance, not a message context.

By the time you start a new task, you don’t know whether the Handle function has returned or not, so you should simply consider that the message has already been used and thus cannot be restored. If errors occur in your individual task, you cannot repeat them.

Avoid lengthy processes without perseverance. In the example that you mentioned, there is a server on which the work item from the message is stored, and a process that polls this store for work items. Perhaps this is not ideal if you scale the processors but not lose the message.

To avoid constant polling, drain the server and the processor, polling is not performed correctly once when it starts and in Handle schedule the polling task. Make sure that this task is carried out only for the survey, if no other survey task is performed, otherwise it may become worse than the constant survey. You can use a semaphore to control this.

You must have more servers to zoom out. You need to measure if the cost of polling N processors is more than sending to N servers in a circular fashion, for some N, to know which approach really works better. In practice, the survey is good enough for low N.

Changing the sample for multiple processors may require less deployment and configuration efforts, you simply add or accept processors, while adding or removing servers requires changing their enthusiasts in all places (for example, configuration files) that point to them.

Another approach would be to break down a long process into steps. NServiceBus has sagas. This is an approach commonly used for knowledge or a limited number of steps. For an unknown number of steps, it is still possible, although some of them may consider it an abuse of the seemingly intended purpose of the sagas.

+2
source

Source: https://habr.com/ru/post/1275633/


All Articles