Repeat asynchronous task code with reactive extensions

Having the code below in my data access class.

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            using (var connection = Connection)
            {
                var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                Task<IEnumerable<TEntity>> queryTask =
                    connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                        commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                connection.Close();
                connection.Dispose();
                tokenSource.Dispose();
                return data;
            }
        }

I want when a SqlExeptionquit repeating once. Keep in mind that I cannot apply RX to an application, but only in this block of code.

I tried the code below, it looks like it is executing correctly, and it Doregisters in Console output, but it doesn’t actually call the handler Catch, and I'm not sure if the handler is Retryalso running .

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            return await Observable.Defer(async () =>
            {
                using (var connection = Connection)
                {
                    var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                    Task<IEnumerable<TEntity>> queryTask =
                        connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                            commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                    IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                    connection.Close();
                    connection.Dispose();
                    tokenSource.Dispose();
                    return Observable.Return(data);
                }
            })
            .Catch<IEnumerable<TEntity>, SqlException>(source =>
           {
               Debug.WriteLine($"QueryAsync Exception {source}");
               return Observable.Return(new List<TEntity>());
           })
           .Throttle(TimeSpan.FromMilliseconds(500))
           .Retry(1)
           .Do(_ => Debug.WriteLine("Do QueryAsync"));
        }
+4
source share
1 answer

I see several potential problems with your code:

  • , , QueryWithRetryAsync. ,
  • Catch , Retry. SqlException , Retry
  • , Throttle ,
  • Retry(1) , , , ( ). , "" , Retry(2)

, , :

class Program
{
    static void Main(string[] args)
    {
        var pipeline = Observable
            .Defer(() => DoSomethingAsync().ToObservable())
            .Retry(2)
            .Catch<string, InvalidOperationException>(ex => Observable.Return("default"));

        pipeline
            .Do(Console.WriteLine)
            .Subscribe();

        Console.ReadKey();
    }

    private static int invocationCount = 0;

    private static async Task<string> DoSomethingAsync()
    {
        Console.WriteLine("Attempting DoSomethingAsync");

        await Task.Delay(TimeSpan.FromSeconds(2));

        ++invocationCount;

        if (invocationCount == 2)
        {
            return "foo";
        }

        throw new InvalidOperationException();
    }
}
+6

Source: https://habr.com/ru/post/1666551/


All Articles