How to register error message in masstransit?

I am looking for a good solution for the log of a failed message, immediately after exceeding the retry limit, without a deal with the error. What I have found so far:

  • I can inherit from InMemoryInboundMessageTracker and override IsRetryLimitExceeded, but at the moment there is no information about the message itself, except for id.
  • I can implement IInboundMessageInterceptor and get IConsumeContext in Pre / PostDispatch, but at the moment there is no information about success / failure.

So, as a solution, I can get IConsumeContext in PreDispatch to put it in some kind of cache and then pull it out of the cache in IsRetryLimitExceeded when the limit is exceeded.

The methods are called in this order: IsRetryLimitExceeded -> PreDispatch -> PostDispatch

Therefore, I cannot find a good place to delete a successfully processed message from the cache.

Of course, I can use a cache with a limited size, but this whole solution seems strange.

Any thoughts on this will be appreciated.

+6
source share
2 answers

You can implement and configure your own message tracking on the bus so that messages that are skipped go through your implementation. You can delegate the default tracking tracker and simply intercept events so that you can act on them, or you can perform your own repeat tracking if necessary.

MessageTrackerFactory is the delegate to configure, I think the interface is nearby.

+2
source

I took advantage of this solution:

class MessageInterceptor: IInboundMessageInterceptor { public void PreDispatch(IConsumeContext context) { MessageTracker.Register(context); } public void PostDispatch(IConsumeContext context) {} } class MessageTracker: InMemoryInboundMessageTracker { readonly Logger logger; static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>(); public MessageTracker(int retryLimit, Logger logger) : base(retryLimit) { this.logger = logger; } public static void Register(IConsumeContext context) { DispatchingCache.GetOrAdd(context.MessageId, context); } public override void MessageWasReceivedSuccessfully(string id) { base.MessageWasReceivedSuccessfully(id); IConsumeContext value; DispatchingCache.TryRemove(id, out value); } public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions) { var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions); IConsumeContext failed; if (!result || !DispatchingCache.TryRemove(id, out failed)) return result; // --> log failed IConsumeContext with exception return true; } } 

And to connect these classes to

  serviceBus = ServiceBusFactory.New(config => { ... config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb => { var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline); interceptorConfig.Create(new MessageInterceptor()); })); config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger())); }); 
+4
source

Source: https://habr.com/ru/post/958551/


All Articles