RX: Perform an action when starting or deleting a subscription?

I would like to track the number of active subscribers to my IObservable? I thought I could implement this by incrementing / decrementing the counter whenever "someone" calls up "Subscribe / Recycle".

How can i do this? Or is there a better way to do this?

Apparently this is done using RefCount internally, but the subscription counter is not displayed.

thanks

+5
source share
2 answers

The easiest way to do this is to wrap the Observable in Observable.Create:

IObservable<string> myObs; var returnObservable = Observable.Create<string>(subj => { // TODO: Write code to do stuff on Sub var disp = myObs.Subscribe(subj); return Disposable.Create(() => { disp.Dispose(); // TODO: Write code to do stuff in unsub. }); }); 
+6
source

Thank you, that was what I needed. This can be turned into an operator as follows:

 public static IObservable<TSource> OnSubscribe<TSource>(this IObservable<TSource> source, Action onSubscribe, Action onDispose) { return Observable .Create<TSource>(observer => { onSubscribe?.Invoke(); var subscription = source.Subscribe(observer); return () => { subscription.Dispose(); onDispose?.Invoke(); }; }); } 
+4
source

Source: https://habr.com/ru/post/1401103/


All Articles