I have an IEnumerable<T> where T allows me to output the appropriate timestamp.
I would like to convert this to an IObservable<T> , but I want to use the HistoricalScheduler so that notifications occur according to the derived timestamp. This allows you to use the built-in RX methods for windows, sliding windows, etc., which I ultimately try to use.
Many of the suggestions on how to do this involve using Generate() . However, this method calls StackOverflowExceptions . For instance:
static void Main(string[] args) { var enumerable = Enumerable.Range(0, 2000000); var now = DateTimeOffset.Now; var scheduler = new HistoricalScheduler(now); var observable = Observable.Generate( enumerable.GetEnumerator(), e => e.MoveNext(), e => e, e => Timestamped.Create(e, now.AddTicks(e.Current)), e => now.AddTicks(e.Current), scheduler); var s2 = observable.Count().Subscribe(eventCount => Console.WriteLine("Found {0} events @ {1}", eventCount, scheduler.Now)); scheduler.Start(); s2.Dispose(); Console.ReadLine(); }
This will cause a stack overflow.
The standard ToObservable() method cannot be used because, although it allows you to specify a custom scheduler, it does not provide any mechanism for controlling how scheduled notifications are scheduled in this scheduler.
How do I convert IEnumerable to IObservable with a scheduled schedule notification?
UPDATE
Trying to use Asti code in the following test:
static void Main(string[] args) { var enumerable = Enumerable.Range(0, 2000000); var now = DateTimeOffset.Now; var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i))); var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i)); var scheduler = new HistoricalScheduler(now); Playback(series,ticks,scheduler).Subscribe(Console.WriteLine); scheduler.Start(); }
However, it throws an ArgumentOutOfRangeException :
Specified argument was out of the range of valid values. Parameter name: time at System.Reactive.Concurrency.VirtualTimeSchedulerBase`2.AdvanceTo(TAbsolute time) at System.Reactive.AnonymousSafeObserver`1.OnNext(T value) at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value) at System.Reactive.Linq.ObservableImpl.Timer.TimerImpl.Tick(Int64 count) at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__1() at System.Reactive.Concurrency.AsyncLock.Wait(Action action) at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__0() at System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.PeriodicTimer.Tick(Object state) at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.TimerQueueTimer.CallCallback() at System.Threading.TimerQueueTimer.Fire() at System.Threading.TimerQueue.FireNextTimers() at System.Threading.TimerQueue.AppDomainTimerCallback()