I took advantage of this solution:
class MessageInterceptor: IInboundMessageInterceptor { public void PreDispatch(IConsumeContext context) { MessageTracker.Register(context); } public void PostDispatch(IConsumeContext context) {} } class MessageTracker: InMemoryInboundMessageTracker { readonly Logger logger; static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>(); public MessageTracker(int retryLimit, Logger logger) : base(retryLimit) { this.logger = logger; } public static void Register(IConsumeContext context) { DispatchingCache.GetOrAdd(context.MessageId, context); } public override void MessageWasReceivedSuccessfully(string id) { base.MessageWasReceivedSuccessfully(id); IConsumeContext value; DispatchingCache.TryRemove(id, out value); } public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions) { var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions); IConsumeContext failed; if (!result || !DispatchingCache.TryRemove(id, out failed)) return result;
And to connect these classes to
serviceBus = ServiceBusFactory.New(config => { ... config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb => { var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline); interceptorConfig.Create(new MessageInterceptor()); })); config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger())); });
source share