Observation. Use with an asynchronous task

I used Observable.Using with methods that return IDisposable in the path:

Observable.Using(() => new Stream(), s => DoSomething(s)); 

But how will we act when a thread is created asynchronously? As in this:

 Observable.Using(async () => await CreateStream(), s => DoSomething(s)); async Task<Stream> CreateStream() { ... } DoSomething(Stream s) { ... } 

This does not compile because it states that s is Task<Stream> instead of Stream .

What a deal?

+5
source share
2 answers

Let's look at the source of asynchronous overload Observable.Using :

 Observable.FromAsync(resourceFactoryAsync) .SelectMany(resource => Observable.Using(() => resource, r => Observable.FromAsync(ct => observableFactoryAsync(r, ct)).Merge())); 

Knowing that it just uses the synchronous version under the hood, you can do something similar to adapt it to your use:

 Observable.FromAsync(CreateStream) .SelectMany(stream => Observable.Using(() => stream, DoSomething)); 

Unfortunately, overloading is not included for this, but you can always create your own:

 public static class ObservableEx { public static IObservable<TSource> Using<TSource, TResource>( Func<Task<TResource>> resourceFactoryAsync, Func<TResource, IObservable<TSource>> observableFactory) where TResource : IDisposable => Observable.FromAsync(resourceFactoryAsync).SelectMany( resource => Observable.Using(() => resource, observableFactory)); } 

and then it's simple:

 ObservableEx.Using(CreateStream, DoSomething); 

All this assumes that DoSomething returns an observable that is not mentioned in your question, but is required by contract Observable.Using .

0
source

So the problem is that there are two overloads with very inconvenient type signatures.

When you pass the return function Task<TResource> as the first argument, it seems that the second argument, the “observed factory”, will also return the Task , but the output is then not executed, because in your example, the parameter declared for the cancellation marker by one function is missing, and both require this parameter. The conclusion fails in many ways, and it is confusing.

I will try to present it in perspective with a realistic example.

Note: recording code such as the following is probably a bad idea, but it is at least a real type of use.

 var connectionString = @"Data Source=.\SQLEXPRESS;Integrated Security=SSPI;app=LINQPad"; Observable.Using( resourceFactoryAsync: async ct => await ConnectAsync(connectionString, ct), observableFactoryAsync: async (connection, ct) => await QueryAsync(connection, ct) ) .Subscribe( onNext: Console.WriteLiner, onError: Console.Error.WriteLine ); async Task<IObservable<string>> QueryAsync(SqlConnection c, CancellationToken ct = default) { var command = c.CreateCommand(); command.CommandText = "select * from Categories as c order by c.CategoryId"; var enumerableQuery = from IDataRecord record in await command.ExecuteReaderAsync(ct) select (string)record["CategoryName"]; return enumerableQuery.ToObservable(); } async Task<SqlConnection> ConnectAsync(string cs, CancellationToken ct = default) { var connection = new SqlConnection(cs); await connection.OpenAsync(ct); return connection; } 

Note: to use named arguments, it is easy to document which overload is selected.

It was the best I could come up with, but feel free to edit it with the best examples.

Signs Noteworthy

  • All this completely collapses, compilation fails if a null function is provided for the resourceFactoryAsync or observableFactoryAsync function. You must declare a CancellationToken as a formal parameter of both the factory resource and the observed factory functions. Both are named ct in the code above for short. Although we just pass them on, it’s important that they are announced.

  • note that the connection observableFactoryAsync parameter is not an expected, but an actual resource. The error you received, suggesting that you naturally assume that the resource should be expected, is caused by an output type that selects synchronous overload and thereby forwards TResource as Task<X> to the next function, which was also considered synchronous.

  • The way these abstractions are composed seems rather inconvenient, at least for me. We mix several asynchronous and synchronous programming styles, and we mix IDisposable with it so that we create them explicitly, asynchronously, but who knows how they will be deleted.

  • The QueryAsync function actually creates a secondary IDisposable , SqlCommand , which I usually wrap in a using block, but I cannot, because I need a lazy evaluation of the stored query, and therefore I need the command to not be deleted when the return closure is called.

  • There is probably a better way to do this, but I wonder if it involves calling Observable.Using several times, and then merging them. I'm basically just amazed right now about how complex the Observable monad is (if I'm a Rx newbie), but it really seems like it wants your entire application to be wrapped inside it.

+1
source

Source: https://habr.com/ru/post/1272148/


All Articles