Token revocation for observable

How can I cancel the next type of Rx Observable if, when I click StartButton, the next observable is created, that is, from the stop button.

var instance = ThreadPoolScheduler.Instance; Observable.Interval(TimeSpan.FromSeconds(2), instance) .Subscribe(_ => { Console.WriteLine(DateTime.Now); // dummy event } ); 
+5
source share
2 answers

Just use one of the Subscribe overloads that accepts the CancellationToken :

 observable.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cancellationToken); 

This simplifies the Jon Skeet example:

 using System; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; class Program { static void Main(string[] args) { var instance = ThreadPoolScheduler.Instance; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); Observable.Interval(TimeSpan.FromSeconds(0.5), instance) .Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token); Thread.Sleep(10000); } } 
+10
source

You save the IDisposable that was returned by Subscribe and call Dispose on it.

It may be possible to integrate the cancellation of Rx IDisposable with the CancellationToken out of the box, but just calling Dispose will be the beginning. (You can always simply register a continuation with a cancellation token to call dispose ...)

Here is a short but complete example to demonstrate:

 using System; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; class Program { static void Main(string[] args) { var instance = ThreadPoolScheduler.Instance; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var disposable = Observable .Interval(TimeSpan.FromSeconds(0.5), instance) .Subscribe(_ => Console.WriteLine(DateTime.UtcNow)); cts.Token.Register(() => disposable.Dispose()); Thread.Sleep(10000); } } 
+9
source

Source: https://habr.com/ru/post/1242862/


All Articles