: Scan :
IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source,
TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator)
-
IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source,
TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator)
... , , .
Scan:
public static IObservable<TAccumulate> MyScan<TSource, TAccumulate>(this IObservable<TSource> source,
TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator)
{
return source
.Publish(_source => _source
.Take(1)
.Select(s => accumulator(initialValue, s))
.SelectMany(m => _source.MyScan(m, accumulator).StartWith(m))
);
}
, , :
public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source,
TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator)
{
return source
.Publish(_source => _source
.Take(1)
.Select(s => accumulator(initialValue, s))
.SelectMany(async o => (await o.LastOrDefaultAsync())
.Let(m => _source
.MyObservableScan(m, accumulator)
.StartWith(m)
)
)
.Merge()
);
}
public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source,
TAccumulate initialValue, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
return source.MyObservableScan(initialValue, (a, s) => Observable.FromAsync(() => accumulator(a, s)));
}
public static U Let<T, U>(this T t, Func<T, U> selector)
{
return selector(t);
}
, MyObservableScan, :
var o = Observable.Interval(TimeSpan.FromMinutes(1))
.MyObservableScan<long, ResultSet<string>>(null, (r, _) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle))
, , Task/Observable , , . , . - , .