Well, here is a test suite (using nuget Microsoft.Reactive.Testing ):
var ts = new TestScheduler(); var source = ts.CreateHotObservable<char>( new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')), new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')), new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')), new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')), new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')), new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')), new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G')) ); var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts); var expectedResults = ts.CreateHotObservable<char>( new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')), new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')), new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')), new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G')) ); var observer = ts.CreateObserver<char>(); target.Subscribe(observer); ts.Start(); ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
and using
public static class TestingHelpers { public static long MsTicks(this int i) { return TimeSpan.FromMilliseconds(i).Ticks; } }
It seems to pass. If you want to reduce it, you can include it in this:
public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) { return source.Publish(_source => _source .Window(() => _source .Select(x => Observable.Interval(dueTime, scheduler)) .Switch() )) .Publish(cooldownWindow => Observable.Merge( cooldownWindow .SelectMany(o => o.Take(1)), cooldownWindow .SelectMany(o => o.Skip(1)) .Throttle(dueTime, scheduler) ) ); }
EDIT
Publish forcibly distributes the subscription. If you have a bad (or expensive) source observed with the side effects of a subscription, Publish guarantees that you will only subscribe once. Here is an example where Publish helps:
void Main() { var source = UglyRange(10); var target = source .SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i))) .ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent .Subscribe(i => Console.WriteLine(i)); } static int counter = 0; public IObservable<int> UglyRange(int limit) { var uglySource = Observable.Create<int>(o => { if (counter++ == 0) { Console.WriteLine("Ugly observable should only be created once."); Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i)); } else { Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created."); o.OnError(new Exception($"observable invoked {counter} times.")); } return Disposable.Empty; }); return uglySource; }