How to convert IEnumerable to IObservable using the HistoricalScheduler function

I have an IEnumerable<T> where T allows me to output the appropriate timestamp.

I would like to convert this to an IObservable<T> , but I want to use the HistoricalScheduler so that notifications occur according to the derived timestamp. This allows you to use the built-in RX methods for windows, sliding windows, etc., which I ultimately try to use.

Many of the suggestions on how to do this involve using Generate() . However, this method calls StackOverflowExceptions . For instance:

  static void Main(string[] args) { var enumerable = Enumerable.Range(0, 2000000); var now = DateTimeOffset.Now; var scheduler = new HistoricalScheduler(now); var observable = Observable.Generate( enumerable.GetEnumerator(), e => e.MoveNext(), e => e, e => Timestamped.Create(e, now.AddTicks(e.Current)), e => now.AddTicks(e.Current), scheduler); var s2 = observable.Count().Subscribe(eventCount => Console.WriteLine("Found {0} events @ {1}", eventCount, scheduler.Now)); scheduler.Start(); s2.Dispose(); Console.ReadLine(); } 

This will cause a stack overflow.

The standard ToObservable() method cannot be used because, although it allows you to specify a custom scheduler, it does not provide any mechanism for controlling how scheduled notifications are scheduled in this scheduler.

How do I convert IEnumerable to IObservable with a scheduled schedule notification?

UPDATE

Trying to use Asti code in the following test:

  static void Main(string[] args) { var enumerable = Enumerable.Range(0, 2000000); var now = DateTimeOffset.Now; var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i))); var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i)); var scheduler = new HistoricalScheduler(now); Playback(series,ticks,scheduler).Subscribe(Console.WriteLine); scheduler.Start(); } 

However, it throws an ArgumentOutOfRangeException :

 Specified argument was out of the range of valid values. Parameter name: time at System.Reactive.Concurrency.VirtualTimeSchedulerBase`2.AdvanceTo(TAbsolute time) at System.Reactive.AnonymousSafeObserver`1.OnNext(T value) at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value) at System.Reactive.Linq.ObservableImpl.Timer.TimerImpl.Tick(Int64 count) at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__1() at System.Reactive.Concurrency.AsyncLock.Wait(Action action) at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__0() at System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.PeriodicTimer.Tick(Object state) at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.TimerQueueTimer.CallCallback() at System.Threading.TimerQueueTimer.Fire() at System.Threading.TimerQueue.FireNextTimers() at System.Threading.TimerQueue.AppDomainTimerCallback() 
+5
source share
2 answers

We make an operator that reproduces an ordered sequence of events on the historical planner with the movement of time in accordance with the indicated observable.

  public static IObservable<T> Playback<T>( this IEnumerable<Timestamped<T>> enumerable, IObservable<DateTimeOffset> ticks, HistoricalScheduler scheduler = default(HistoricalScheduler) ) { return Observable.Create<T>(observer => { scheduler = scheduler ?? new HistoricalScheduler(); //create enumerator of sequence - we're going to iterate through it manually var enumerator = enumerable.GetEnumerator(); //set scheduler time for every incoming value of ticks var timeD = ticks.Subscribe(scheduler.AdvanceTo); //declare an iterator Action scheduleNext = default(Action); scheduleNext = () => { //move if (!enumerator.MoveNext()) { //no more items //sequence has completed observer.OnCompleted(); return; } //current item of enumerable sequence var current = enumerator.Current; //schedule the item to run at the timestamp specified scheduler.ScheduleAbsolute(current.Timestamp, () => { //push the value forward observer.OnNext(current.Value); //schedule the next item scheduleNext(); }); }; //start the process by scheduling the first item scheduleNext(); //dispose the enumerator and subscription to ticks return new CompositeDisposable(timeD, enumerator); }); } 

Porting the previous example,

  var enumerable = Enumerable.Range(0, 20000000); var now = DateTimeOffset.Now; var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i))); var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i)); series.Playback(ticks).Subscribe(Console.WriteLine); 

We read the enumerable to be lazy, and set the clock with simple Interval observables. Reducing the interval makes it lose faster.

+5
source

Phil, I think you will find problems, regardless of whether you try to issue a notification every tick. Rx just can't keep up.

I see what @asti is doing here, but I think it can simplify the use of what Paul already had ( IEnumerable<Timestamped<T>> )

 public static IObservable<T> Playback<T>( this IEnumerable<Timestamped<T>> enumerable, IScheduler scheduler) { return Observable.Create<T>(observer => { var enumerator = enumerable.GetEnumerator(); //declare a recursive function Action<Action> scheduleNext = (self) => { //move if (!enumerator.MoveNext()) { //no more items (or we have been disposed) //sequence has completed scheduler.Schedule(()=>observer.OnCompleted()); return; } //current item of enumerable sequence var current = enumerator.Current; //schedule the item to run at the timestamp specified scheduler.Schedule(current.Timestamp, () => { //push the value forward observer.OnNext(current.Value); //Recursively call self (via the scheduler API) self(); }); }; //start the process by scheduling the recursive calls. // return the scheduled handle to allow disposal. var scheduledTask = scheduler.Schedule(scheduleNext); return StableCompositeDisposable.Create(scheduledTask, enumerator); }); } 

It is also an agnostic planner, so it will work with any planner.

+1
source

Source: https://habr.com/ru/post/1261073/


All Articles