How can I create an observable Rx that stops posting events when the last observer cancels the subscription?

I will create an observable (using various means) and return it to interested parties, but when they listen, I would break the observables so that it does not continue to consume resources. Another way to think of it as creating topics in a pub system. When no one else subscribes to a topic, you no longer want to keep the topic and its filtering.

+6
source share
2 answers

Rx already has an operator to suit your needs - well, two actually - Publish and RefCount .

Here's how to use them:

 IObservable xs = ... var rxs = xs.Publish().RefCount(); var sub1 = rxs.Subscribe(x => { }); var sub2 = rxs.Subscribe(x => { }); //later sub1.Dispose(); //later sub2.Dispose(); //The underlying subscription to `xs` is now disposed of. 

Simple

+10
source

If I understand your question, you want to create an observable, so that when all subscribers have placed their subscription, that is, there is no longer a subscriber, then you want to perform a cleanup function that will stop the observables from further production values. If this is what you want, you can do something like below:

 //Wrap a disposable public class WrapDisposable : IDisposable { IDisposable disp; Action act; public WrapDisposable(IDisposable _disp, Action _act) { disp = _disp; act = _act; } void IDisposable.Dispose() { act(); disp.Dispose(); } } //Observable that we want to clean up after all subs are done public static IObservable<long> GenerateObs(out Action cleanup) { cleanup = () => { Console.WriteLine("All subscribers are done. Do clean up"); }; return Observable.Interval(TimeSpan.FromSeconds(1)); } //Wrap the observable public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone) { int count = 0; return Observable.CreateWithDisposable<T>(ob => { var disp = obs.Subscribe(ob); Interlocked.Increment(ref count); return new WrapDisposable(disp,() => { if (Interlocked.Decrement(ref count) == 0) { onAllDone(); } }); }); } 

// Usage example:

 Action cleanup; var obs = GenerateObs(out cleanup); var newObs = WrapToClean(obs, cleanup); newObs.Take(6).Subscribe(Console.WriteLine); newObs.Take(5).Subscribe(Console.WriteLine); 
+1
source

Source: https://habr.com/ru/post/897797/


All Articles