You save the IDisposable that was returned by Subscribe and call Dispose on it.
It may be possible to integrate the cancellation of Rx IDisposable with the CancellationToken out of the box, but just calling Dispose will be the beginning. (You can always simply register a continuation with a cancellation token to call dispose ...)
Here is a short but complete example to demonstrate:
using System; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; class Program { static void Main(string[] args) { var instance = ThreadPoolScheduler.Instance; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var disposable = Observable .Interval(TimeSpan.FromSeconds(0.5), instance) .Subscribe(_ => Console.WriteLine(DateTime.UtcNow)); cts.Token.Register(() => disposable.Dispose()); Thread.Sleep(10000); } }
source share