, (Observable) - (Task) . Task Interval, Select. , Observable Defer:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
//I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
.Select(l => Observable.Defer(() => DoWork()))
.Concat()
.Subscribe();
, Observable Task, , .
, , , , . GenerateAsync :
public static IObservable<TOut> GenerateAsync<TResult, TOut>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TimeSpan> timeSelector,
Func<TResult, TOut> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TOut>(async obs => {
var init = await initialState();
return s.Schedule(init, timeSelector(init), async (state, recurse) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
state = await iterate(state);
recurse(state, timeSelector(state));
});
});
}
GenerateAsync(DoWork ,
_ => true ,
_ => DoWork() ,
_ => TimeSpan.FromSeconds(5) ,
_ => _ ,
scheduler)
.Subscribe();
/, , .