SqlConnection not used when using async

I have a project with a Sql-Server database database and Dapper as ORM. I am trying to use the Dapper method QueryAsync()to get some data. Not only this, but also the call of my repo comes from several tasks called using Task.WhenAll(That is, each task involves obtaining data from this repo, so each task expects my repo method, which wraps QueryAsync()).

The problem is that my SqlConnections never close, although I use a block using. As a result, I have 100+ open connections to my database and, in the end, start to get exceptions "maximum pool size". The thing is, when I switch to Query()instead QueryAsync(), it works fine, but I would like to do it asynchronously.

Here is a sample code. I tried to imitate the structure of the actual application as best as possible, so it looks more complex than it should be.

Interface:

public interface IFooRepository<T> where T: FooBase
{
    Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}

Implementation:

public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
    private readonly IWebApiClientRepository _accountRepository;

    public FooRepository(IWebApiClientRepository repo)
    {
        _accountRepository = repo;
    }
    public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
    {
        effectiveDate = effectiveDate ?? DateTime.Today.Date;
        var referenceData =  await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
        using (var connection = new SqlConnection("iamaconnectionstring")
        {
            connection.Open();
            try
            {
                var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
                    new
                    {
                        effectiveDate = effectiveDate.Value,
                        code = referenceData.Code
                    });

                foreach (var item in res)
                {
                    item.PropFromReference = referenceData.PropFromReference;
                }
                return res;
            }
            catch (Exception e)
            {
                //log 
                throw;
            }
            finally
            {
                connection.Close();
            }
        }
    }
}

So, now with the call code there are 2 layers. I will start from the outside. I think this is a problem. There are comments below.

Populator:

public class Populator : PopulatorBase
{
    private IAccountRepository _acctRepository;
    public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
    {
        //My attempt at throttling the async calls
        //I was hoping this would force a max of 10 simultaneous connections.
        //It did not work.
        SemaphoreSlim ss = new SemaphoreSlim(10,10);
        var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
        var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();

        List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
        foreach (var item in accountNumbers)
        {
            await ss.WaitAsync();
            trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
            ss.Release();
        }
        //my gut tells me the issue is because of these tasks
        var results = await Task.WhenAll(trackedTasks);
        return results;
    }

    private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
    {
        var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
        return Populate(accountCode, createdItems);
    }
}

ItemCreator:

public class ItemCreator : ItemCreatorBase
{
    private readonly IFooRepository<FuturePosition> _fooRepository;
    private readonly IBarRepository<FuturePosition> _barRepository;

    public RussellGlobeOpFutureExceptionCreator() )
    {
        //standard constructor stuff
    }
    public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
    {
        DateTime reconDate = effectiveDate ?? DateTime.Today.Date;

        //this uses the repository I outlined above
        var foos = await _fooRepository.Select(account, effectiveDate);

        //this repository uses a rest client, I doubt it the problem
        var bars = await _barRepository.Select(account, effectiveDate);

        //just trying to make this example less lengthy
        var foobars = MakeFoobars(foos, bars);
        var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
        return result;
    }
}

As far as I tried:

  • Throttling with SemaphoreSlim
  • No throttling
  • connection.OpenAnync()Repo usage
  • / finally ( using)

, foreach 500 . , 500 . populate, Foo.

. , async db . .

+4
1

, , . , , . , , , SQL , - . SQL .

, , , "" , , . , , , .

SemaphoreSlim - . :

public override async Task<IEnumerable<ProcessResult>> ProcessAccount(DateTime? popDate = null)
{
      foreach (item in accountNumbers)
      {
      await ss.WaitAsync().ConfigureAwait(false);
      trackedTasks.Add(new Func<Task<ProcessResult>>(async () =>
            {
                try
                {
                    return await ProcessAccount(item.AccountCode, popDate ?? DateTime.Today).ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    //log, etc.
                }
                finally
                {
                    ss.Release();
                }
            })());
      }
}

, , , .

+2

Source: https://habr.com/ru/post/1692335/


All Articles