Convert IEnumerable <Task <T>> to IObservable <T>

I am trying to use Reactive Extensions (Rx) to buffer Jobs enumerations as they are completed. Does anyone know if there is a clean built-in way to do this? The ToObservable extension method ToObservable simply make IObservable<Task<T>> , which is not what I want, I want IObservable<T> to use Buffer then.

A simplified example:

 //Method designed to be awaitable public static Task<int> makeInt() { return Task.Run(() => 5); } //In practice, however, I don't want to await each individual task //I want to await chunks of them at a time, which *should* be easy with Observable.Buffer public static void Main() { //Make a bunch of tasks IEnumerable<Task<int>> futureInts = Enumerable.Range(1, 100).Select(t => makeInt()); //Is there a built in way to turn this into an Observable that I can then buffer? IObservable<int> buffered = futureInts.TasksToObservable().Buffer(15); //???? buffered.Subscribe(ints => { Console.WriteLine(ints.Count()); //Should be 15 }); } 
+6
source share
1 answer

You can use the fact that Task can be converted to observable using another ToObservable() overload .

When you have a collection of (on-position) observables, you can create one observable that contains elements as they are completed using Merge() .

So your code might look like this:

 futureInts.Select(t => t.ToObservable()) .Merge() .Buffer(15) .Subscribe(ints => Console.WriteLine(ints.Count)); 
+7
source

Source: https://habr.com/ru/post/947046/


All Articles