Why is Task.WhenAny so slow when called many times against the same TaskCompletionSource?

If a class has a member TaskCompletionSource<TResult> m_tcswith a long lifetime, and if Task.WhenAny is called with m_tcs.Taskone of its arguments, performance seems to deteriorate exponentially when the number of calls exceeds 50,000 calls or so.

Why is it so slow in this case? Could there be an alternative that works faster, but without using 4 times more memory?

My thought is that Task.WhenAnyit probably adds and removes so many continuations to and from m_tcs.Taskand somewhere else, this leads to O (N²) complexity.

I found a more efficient alternative by wrapping TCS in an asynchronous function that expects m_tcs.Task. It uses about 4x memory, but it is much faster than 20,000 iterations.

The sample code below (for accurate results, compile and run .exe directly without an attached debugger). Note that it WhenAnyMemberTcsDirecthas a performance problem, WhenAnyMemberTcsIndirectis a faster alternative, and WhenAnyLocalTcsis the basis for comparison:

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

public class WithTcs
{
    // long-lived TaskCompletionSource
    private readonly TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();

    // this has performance issues for large N - O(N^2)
    public async Task WhenAnyMemberTcsDirectAsync(Task task)
    {
        await await Task.WhenAny(task, m_tcs.Task).ConfigureAwait(false);
    }

    // performs faster - O(N), but uses 4x memory
    public async Task WhenAnyMemberTcsIndirectAsync(Task task)
    {
        await await Task.WhenAny(task, AwaitTcsTaskAsync(m_tcs)).ConfigureAwait(false);
    }

    private async Task<TResult> AwaitTcsTaskAsync<TResult>(TaskCompletionSource<TResult> tcs)
    {
        return await tcs.Task.ConfigureAwait(false);
    }

    // baseline for comparison using short-lived TCS
    public async Task WhenAnyLocalTcsAsync(Task task)
    {
        var tcs = new TaskCompletionSource<bool>();
        await await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
    }
}

class Program
{
    static void Main(string[] args)
    {
        show_warning_if_debugger_attached();

        MainAsync().GetAwaiter().GetResult();

        show_warning_if_debugger_attached();
        Console.ReadLine();
    }

    static async Task MainAsync()
    {
        const int n = 100000;

        Console.WriteLine("Running Task.WhenAny tests ({0:#,0} iterations)", n);
        Console.WriteLine();

        await WhenAnyLocalTcs(n).ConfigureAwait(false);

        await Task.Delay(1000).ConfigureAwait(false);

        await WhenAnyMemberTcsIndirect(n).ConfigureAwait(false);

        await Task.Delay(1000).ConfigureAwait(false);

        await WhenAnyMemberTcsDirect(n).ConfigureAwait(false);
    }

    static Task WhenAnyLocalTcs(int n)
    {
        Func<WithTcs, Task, Task> function =
            (instance, task) => instance.WhenAnyLocalTcsAsync(task);

        return RunTestAsync(n, function);
    }

    static Task WhenAnyMemberTcsIndirect(int n)
    {
        Func<WithTcs, Task, Task> function =
            (instance, task) => instance.WhenAnyMemberTcsIndirectAsync(task);

        return RunTestAsync(n, function);
    }

    static Task WhenAnyMemberTcsDirect(int n)
    {
        Func<WithTcs, Task, Task> function =
            (instance, task) => instance.WhenAnyMemberTcsDirectAsync(task);

        return RunTestAsync(n, function);
    }

    static async Task RunTestAsync(int n, Func<WithTcs, Task, Task> function, [CallerMemberName] string name = "")
    {
        Console.WriteLine(name);

        var tasks = new Task[n];
        var sw = new Stopwatch();
        var startBytes = GC.GetTotalMemory(true);
        sw.Start();

        var instance = new WithTcs();
        var step = n / 78;
        for (int i = 0; i < n; i++)
        {
            var iTemp = i;
            Task primaryTask = Task.Run(() => { if (iTemp % step == 0) Console.Write("."); });
            tasks[i] = function(instance, primaryTask);
        }

        await Task.WhenAll(tasks).ConfigureAwait(false);
        Console.WriteLine();

        var endBytes = GC.GetTotalMemory(true);
        sw.Stop();
        GC.KeepAlive(instance);
        GC.KeepAlive(tasks);

        Console.WriteLine("  Time: {0,7:#,0} ms, Memory: {1,10:#,0} bytes", sw.ElapsedMilliseconds, endBytes - startBytes);
        Console.WriteLine();
    }

    static void show_warning_if_debugger_attached()
    {
        if (Debugger.IsAttached)
            Console.WriteLine("WARNING: running with the debugger attached may result in inaccurate results\r\n".ToUpper());
    }
}

Examples of results:

Iterations | WhenAny * Method | Time (ms) | Memory (bytes)
---------: | ----------------- | --------: | -------------:
     1,000 | LocalTcs | 21 | 58,248
     1,000 | MemberTcsIndirect | 54 | 217,268
     1,000 | MemberTcsDirect | 21 | 52,496
    10,000 | LocalTcs          |        91 |        545,836
    10,000 | MemberTcsIndirect |        98 |      2,141,836
    10,000 | MemberTcsDirect   |       140 |        545,640
   100,000 | LocalTcs          |       210 |      4,898,512
   100,000 | MemberTcsIndirect |       502 |     21,426,316
   100,000 | MemberTcsDirect   |    14,090 |      5,085,396
   200,000 | LocalTcs          |       366 |      9,630,872
   200,000 | MemberTcsIndirect |       659 |     41,450,916
   200,000 | MemberTcsDirect   |    42,599 |     10,069,248
   500,000 | LocalTcs          |       808 |     23,670,492
   500,000 | MemberTcsIndirect |     1,906 |     97,339,192
   500,000 | MemberTcsDirect   |   288,373 |     24,968,436
 1,000,000 | LocalTcs          |     1,642 |     47,272,744
 1,000,000 | MemberTcsIndirect |     3,149 |    200,480,888
 1,000,000 | MemberTcsDirect   | 1,268,030 |     48,064,772

: .NET 4.6.2 Release ( ), Windows 7 SP1 64-, Intel Core i7-4770.

+4
1

, , , (O (N) ), . , CancellationTokenSource m_cts TaskCompletionSource. m_tcs cancel/faulted/result m_cts.Cancel(). , , .

:

public class WithTcs
{
    // ... same as above, plus below

    private readonly CancellationTokenSource m_cts = new CancellationTokenSource();

    public async Task WhenAnyMemberCtsAsync(Task task)
    {
        var ct = m_cts.Token;
        var tcs = new TaskCompletionSource<bool>();
        using (ct.Register(() => tcs.TrySetFrom(m_tcs)))
            await await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
    }
}

public static class TcsExtensions
{
    public static bool TrySetFrom<TResult>(this TaskCompletionSource<TResult> dest, TaskCompletionSource<TResult> source)
    {
        switch (source.Task.Status)
        {
            case TaskStatus.Canceled:
                return dest.TrySetCanceled();
            case TaskStatus.Faulted:
                return dest.TrySetException(source.Task.Exception.InnerExceptions);
            case TaskStatus.RanToCompletion:
                return dest.TrySetResult(source.Task.Result);
            default:
                return false; // TCS has not yet completed
        }
    }
}

, , . , WhenAnyMemberTcsDirect, O (N²).

+1

Source: https://habr.com/ru/post/1681999/


All Articles