If I understand your question, you want to create an observable, so that when all subscribers have placed their subscription, that is, there is no longer a subscriber, then you want to perform a cleanup function that will stop the observables from further production values. If this is what you want, you can do something like below:
//Wrap a disposable public class WrapDisposable : IDisposable { IDisposable disp; Action act; public WrapDisposable(IDisposable _disp, Action _act) { disp = _disp; act = _act; } void IDisposable.Dispose() { act(); disp.Dispose(); } } //Observable that we want to clean up after all subs are done public static IObservable<long> GenerateObs(out Action cleanup) { cleanup = () => { Console.WriteLine("All subscribers are done. Do clean up"); }; return Observable.Interval(TimeSpan.FromSeconds(1)); } //Wrap the observable public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone) { int count = 0; return Observable.CreateWithDisposable<T>(ob => { var disp = obs.Subscribe(ob); Interlocked.Increment(ref count); return new WrapDisposable(disp,() => { if (Interlocked.Decrement(ref count) == 0) { onAllDone(); } }); }); }
// Usage example:
Action cleanup; var obs = GenerateObs(out cleanup); var newObs = WrapToClean(obs, cleanup); newObs.Take(6).Subscribe(Console.WriteLine); newObs.Take(5).Subscribe(Console.WriteLine);
source share