So the problem is that there are two overloads with very inconvenient type signatures.
When you pass the return function Task<TResource> as the first argument, it seems that the second argument, the “observed factory”, will also return the Task , but the output is then not executed, because in your example, the parameter declared for the cancellation marker by one function is missing, and both require this parameter. The conclusion fails in many ways, and it is confusing.
I will try to present it in perspective with a realistic example.
Note: recording code such as the following is probably a bad idea, but it is at least a real type of use.
var connectionString = @"Data Source=.\SQLEXPRESS;Integrated Security=SSPI;app=LINQPad"; Observable.Using( resourceFactoryAsync: async ct => await ConnectAsync(connectionString, ct), observableFactoryAsync: async (connection, ct) => await QueryAsync(connection, ct) ) .Subscribe( onNext: Console.WriteLiner, onError: Console.Error.WriteLine ); async Task<IObservable<string>> QueryAsync(SqlConnection c, CancellationToken ct = default) { var command = c.CreateCommand(); command.CommandText = "select * from Categories as c order by c.CategoryId"; var enumerableQuery = from IDataRecord record in await command.ExecuteReaderAsync(ct) select (string)record["CategoryName"]; return enumerableQuery.ToObservable(); } async Task<SqlConnection> ConnectAsync(string cs, CancellationToken ct = default) { var connection = new SqlConnection(cs); await connection.OpenAsync(ct); return connection; }
Note: to use named arguments, it is easy to document which overload is selected.
It was the best I could come up with, but feel free to edit it with the best examples.
Signs Noteworthy
All this completely collapses, compilation fails if a null function is provided for the resourceFactoryAsync or observableFactoryAsync function. You must declare a CancellationToken as a formal parameter of both the factory resource and the observed factory functions. Both are named ct in the code above for short. Although we just pass them on, it’s important that they are announced.
note that the connection observableFactoryAsync parameter is not an expected, but an actual resource. The error you received, suggesting that you naturally assume that the resource should be expected, is caused by an output type that selects synchronous overload and thereby forwards TResource as Task<X> to the next function, which was also considered synchronous.
The way these abstractions are composed seems rather inconvenient, at least for me. We mix several asynchronous and synchronous programming styles, and we mix IDisposable with it so that we create them explicitly, asynchronously, but who knows how they will be deleted.
The QueryAsync function actually creates a secondary IDisposable , SqlCommand , which I usually wrap in a using block, but I cannot, because I need a lazy evaluation of the stored query, and therefore I need the command to not be deleted when the return closure is called.
There is probably a better way to do this, but I wonder if it involves calling Observable.Using several times, and then merging them. I'm basically just amazed right now about how complex the Observable monad is (if I'm a Rx newbie), but it really seems like it wants your entire application to be wrapped inside it.
source share