Revealing ConcurrentQueue <T> as IObservable <T>?

I wondered if it is possible to use a queue (in particular, ConcurrentQueue) as an IObservable source? Sort of:

Queue = new ConcurrentQueue<IMessage>(); var xs = Queue.AsEnumerable().ToObservable(); xs.Subscribe((IMessage msg) => { Console.WriteLine("Msg :" + msg.subject); }); 

I guess this makes no sense, because nothing is canceled. I am trying to implement a non-blocking process that can subscribe to "messages" that will be sent to observers, therefore, using a queue. I'm sure I can do it with RX, but I can't seem to see this!

I would be interested in any suggestions on how this can be implemented. Thanks!

+4
source share
1 answer

You are right, the conversion of the queue (parallel or simple, it does not matter) will only list it, but not de-queue. A real implementation is possible, but more complicated - see the link to a similar question I asked on the RX forum (which is still the best source of information about RX compared to StackOverflow):

How to implement a single queue of consumer manufacturers using RX?

+2
source

Source: https://habr.com/ru/post/1301284/


All Articles