Unit test async methods C #

I have a problem)

I am trying to reproduce something like several sp calls (stored procedures) in my C # code, but I want to do it in an asynchronous way.

TSQL example: ( Execute sp @key = 15072000173475; Execute sp @key = 15072000173571; ... Execute sp @key = n;)

[TestClass]
public class UnitTestNomenclature {
    [TestMethod]
    public void ParallelSQLMethod() {
        Task scropeTasks = null;
        //real amount is more then 1500
        long[] keys = new long[] {15072000173475,15072000173571 ... n };

        try {
            var tasks = keys.Select( i =>  Task.Run(async () => { await RunStoredProc(i); }));
            scropeTasks =  Task.WhenAll(tasks);

            scropeTasks.Wait();
        } catch (Exception ex) {
            Debug.WriteLine("Exception: " + ex.Message);

            Debug.WriteLine("IsFaulted: " + scropeTasks.IsFaulted);
            foreach (var inx in scropeTasks.Exception.InnerExceptions) {
                Debug.WriteLine("Details: " + inx.Message);
            }
        }

        Assert.AreEqual(1, 1);
    }

    public async Task RunStoredProc(long scollNumbParam) {
        const string strStoredProcName = @"[dbo].[sp]";
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;")) {
            await conn.OpenAsync();
            Debug.WriteLine("============================================ Connection is open: ==============================================");

            // info
            Debug.WriteLine(String.Format("Connection: {0}", conn.ClientConnectionId));
            Debug.WriteLine(String.Format("State: {0}", conn.State.ToString()));

            using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure }) {

                SqlParameter scrParam = new SqlParameter() {
                    ParameterName = "@KEYKRT",
                    Value = scollNumbParam,
                    SqlDbType = SqlDbType.BigInt
                };
                cmd.Parameters.Add(scrParam);

                Debug.WriteLine("Start of Proccesing: " + scollNumbParam);
                await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
                Debug.WriteLine("End of Proccesing: " + scollNumbParam);

            }
        }

        Debug.WriteLine("============================================ Connection is closed: ==============================================");
    }
}

This is what I get in the output window:

========== Connection is open: ========
Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
========== Connection is open: ==========
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
.....
End of Proccesing: 15072000173475
=========== Connection is closed: =========
End of Proccesing: 15072000173571
=========== Connection is closed: =========

....

A timeout occurred while waiting for memory resources to execute the query in resource pool 'default' (2). Rerun the query.
Actual error number: 8645
Actual line number: 98

Debug also says that the connection pool is full. I think the main reason the connection is unsatisfactory, but how can I achieve this using async?

If I try to only open one connection before the async tasks are declared and pass this to my RunStoredProc method, then I get a connection that does not support MultipleActiveResultSets

using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;)) {

                    conn.OpenAsync();
                    var tasks = keys.Select(i => Task.Run(async () => { await RunStoredProc(i, conn); }));
                    scropeTasks = Task.WhenAll(tasks);

                    scropeTasks.Wait();
                }

                Debug.WriteLine("========== Connection is closed: ==========");

This is what I get in the output window:

Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
======= Connection is open: =============
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
========= Connection is open: =========
+4
3

1500 , , (, .Wait), .

async void, .

. , , , , . , .

[TestMethod]
public async Task ParallelSQLMethod() {
    //real amount is more then 1500
    var keys = new long[] { 
        15072000173475, 
        15072000173571, 
        //....., n
    };
    var tasks = keys.Select(i => RunStoredProc(i));
    var batchSize = 50; //Or smaller

    //run tasks in batches
    var sequence = tasks;
    while (sequence.Any()) {
        var batch = sequence.Take(batchSize);
        sequence = sequence.Skip(batchSize);

        await Task.WhenAll(batch);
    }
}
+5

, async/await/concurrent/threading . , .

1). , unit test SQL .

, Max (n_cores/2, 1).

, - 1-3 .

/ , / , , ..

2) SQL- concurrency. , - 1500 . .

, : MultipleActiveResultSets.

, .

! , , , . ...

3) , :

  • ,
  • ConcurrentQueue;
  • Task [] , ,
  • ,
  • Task.WhenAll();

/ , , , , ...

4) :

[TestClass]
public class UnitTestNomenclature
{
    [TestMethod]
    public async Task ParallelSQLMethod()
    {
        long[] keys = new long[] { 15072000173475, 15072000173571 };

        ConcurrentQueue<long> queue = new ConcurrentQueue<long>(keys);

        int connections = Math.Max(1, Environment.ProcessorCount / 2);

        Task[] tasks =
        Enumerable
        .Range(0, connections)
        .Select(i => Task.Run<Task>(() => RunConnection(i, queue)).Unwrap())
        .ToArray()
        ;

        await Task.WhenAll(tasks);
    }

    public async Task RunConnection(int connection, ConcurrentQueue<long> queue)
    {
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;"))
        {
            await conn.OpenAsync();
            Debug.WriteLine($"====== Connection[{connection}] is open: ======");

            Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
            Debug.WriteLine($"Connection[{connection}].State: {conn.State}");

            long scollNumbParam;

            while (queue.TryDequeue(out scollNumbParam))
            {
                await RunStoredProc(conn, connection, scollNumbParam);
                Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
                Debug.WriteLine($"Connection[{connection}].State: {conn.State}");
            }
        }

        Debug.WriteLine($"====== Connection[{connection}] is closed  ======");
    }

    public async Task RunStoredProc(SqlConnection conn, int connection, long scollNumbParam)
    {
        const string strStoredProcName = @"[dbo].[sp]";

        using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure })
        {
            SqlParameter scrParam = new SqlParameter()
            {
                ParameterName = "@KEYKRT",
                Value = scollNumbParam,
                SqlDbType = SqlDbType.BigInt
            };
            cmd.Parameters.Add(scrParam);

            Debug.WriteLine($"Connection[{connection}] Start of Proccesing: " + scollNumbParam);
            await cmd.ExecuteNonQueryAsync();
            Debug.WriteLine($"Connection[{connection}] End of Proccesing: " + scollNumbParam);
        }
    }
}
+2

( ). (: Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true), .

  • ( )

  • ( )

  • ( ) (0 - )

: Max Pool Size ( 100) ( :))

, " MultipleActiveResultSets" 'MultipleActiveResultSets=true' , . (MARS)

. MARS, MARS . MARS . , . .

+1
source

Source: https://habr.com/ru/post/1681440/


All Articles