I have a project with a Sql-Server database database and Dapper as ORM. I am trying to use the Dapper method QueryAsync()
to get some data. Not only this, but also the call of my repo comes from several tasks called using Task.WhenAll
(That is, each task involves obtaining data from this repo, so each task expects my repo method, which wraps QueryAsync()
).
The problem is that my SqlConnections never close, although I use a block using
. As a result, I have 100+ open connections to my database and, in the end, start to get exceptions "maximum pool size". The thing is, when I switch to Query()
instead QueryAsync()
, it works fine, but I would like to do it asynchronously.
Here is a sample code. I tried to imitate the structure of the actual application as best as possible, so it looks more complex than it should be.
Interface:
public interface IFooRepository<T> where T: FooBase
{
Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}
Implementation:
public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
private readonly IWebApiClientRepository _accountRepository;
public FooRepository(IWebApiClientRepository repo)
{
_accountRepository = repo;
}
public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
{
effectiveDate = effectiveDate ?? DateTime.Today.Date;
var referenceData = await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
using (var connection = new SqlConnection("iamaconnectionstring")
{
connection.Open();
try
{
var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
new
{
effectiveDate = effectiveDate.Value,
code = referenceData.Code
});
foreach (var item in res)
{
item.PropFromReference = referenceData.PropFromReference;
}
return res;
}
catch (Exception e)
{
throw;
}
finally
{
connection.Close();
}
}
}
}
So, now with the call code there are 2 layers. I will start from the outside. I think this is a problem. There are comments below.
Populator:
public class Populator : PopulatorBase
{
private IAccountRepository _acctRepository;
public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
{
SemaphoreSlim ss = new SemaphoreSlim(10,10);
var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();
List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
foreach (var item in accountNumbers)
{
await ss.WaitAsync();
trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
ss.Release();
}
var results = await Task.WhenAll(trackedTasks);
return results;
}
private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
{
var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
return Populate(accountCode, createdItems);
}
}
ItemCreator:
public class ItemCreator : ItemCreatorBase
{
private readonly IFooRepository<FuturePosition> _fooRepository;
private readonly IBarRepository<FuturePosition> _barRepository;
public RussellGlobeOpFutureExceptionCreator() )
{
}
public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
{
DateTime reconDate = effectiveDate ?? DateTime.Today.Date;
var foos = await _fooRepository.Select(account, effectiveDate);
var bars = await _barRepository.Select(account, effectiveDate);
var foobars = MakeFoobars(foos, bars);
var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
return result;
}
}
As far as I tried:
- Throttling with SemaphoreSlim
- No throttling
connection.OpenAnync()
Repo usage- / finally (
using
)
, foreach
500 . , 500 . populate
, Foo.
. , async db . .