How to create an IObservable <T> that reads from an MSMQ message queue?

I remove our e-mail system from our ASP.NET site, which was used to send e-mail immediately using the request processing system in a separate service to reduce the load on the website. I'm trying to create it around a set of interfaces so that I can swap implementations if I want, but initially it will be based on the message queue (MSMQ) to send requests to the queue, to receive incoming service requests and then process them. I currently have the following interfaces:

// Sends one or more requests to be processed somehow public interface IRequestSender { void Send(IEnumerable<Request> requests); } // Listens for incoming requests and passes them to an observer to do the real work public interface IRequestListener : IObservable<Request> { void Start(); void Stop(); } // Processes a request given to it by a IRequestListener public interface IRequestProcessor : IObserver<Request> { } 

You will notice that the listener and processor use an observable pattern, as this is what, in my opinion, seems to be the best.

My problem is how to write an implementation of IRequestListener that gets from MSMQ, basically, how to create a suitable IObservable<T> ?

My first option I found is to create an IObservable<T> from scratch based on the example provided by the MSDN documentation , but this seems like a lot of plumbing work.

Another option is to use Reactive Extensions, as it seems to be designed to simplify the creation of observables. The closest I found to use Rx with MSMQ are the following pages:

But I'm not sure how I can apply these examples to my IRequestListener interface.

Any other ideas are also welcome, even if you change their basic design, if they are suitable.

+4
source share
1 answer

At first I used FromAsyncPattern, but then I ended up writing a class for it because it handled the timeout and poisoned messages better. Once you start, the lines are in any case hot watchers. You can also use Observable.Defer to bring it closer to Rx instead of Start / Stop.

Here is the basic implementation of QueueObservable. You can simply start by calling ListenReceive .

 Subject<T> Subject = new Subject<T>(); protected void ListenReceive() { Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive); } protected void OnReceive(IAsyncResult ar) { Message message = null; try { message = Queue.EndReceive(ar); } catch (TimeoutException ex) { //retry? } if (message != null) Subject.OnNext((T) message.Body); Thread.Yield(); if (!IsDisposed) ListenReceive(); } public IObservable<T> AsObservable() { return Subject; } 
+5
source

Source: https://habr.com/ru/post/1395385/


All Articles