How to use the Rx.Nex ForEachAsync extension with asynchronous action

I have code that transfers data from SQL and writes it to another store. The code is something like this:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

Instead, I would like to use Reactive extensions for this purpose. Ideally, the code would look like this:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

However, it seems that the ForEachAsync extension method only takes synchronous actions. Is it possible to write an extension that takes an asynchronous action?

+4
source share
3 answers

Is it possible to write an extension that will take an asynchronous action?

Not directly.

Rx , Rx - push. , , - Action.

await - , Rx, await , .. ForEachAsync ( ), ForEachAsync (, ) .

async , . Rx () , . , /, , .

TPL . - :

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;

, ; , StreamDataFromSql , ActionBlock. .

+3

ForEachAsync ToEnumerable AsObservable

ForEachAsync, :

public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
    foreach ( var x in t.ToEnumerable() )
        await onNext( x );
}

:

await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
0

- Reactive Extensions, - , , , .

:

IObservable<IList<MyData>> query =
    Observable
        .Using(() => new SqlConnection(""), connection =>
            Observable
                .Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
                    Observable
                        .Using(() => cmd.ExecuteReader(), reader =>
                            Observable
                                .While(() => reader.Read(), Observable.Return(GetRow(reader))))))
        .Buffer(BatchSize);

IDisposable subscription =
    query
        .Subscribe(async list => await WriteDataAsync(list));

, . , WriteDataAsync IList<MyData>. .ToList().

0

Source: https://habr.com/ru/post/1682537/


All Articles