Why does BufferBlock <T> .ReceiveAsync () freeze when data is available?

I am new to TPL data stream.

I am trying to create a throttle asynchronous update for a fairly fast stream flow. BufferBlock seemed a good match for this with the idea that I could call ReceiveAll () to grab everything from the buffer, and in cases where I can't wait for anything in ReceiveAsync () to get the next item as it arrives.

But sometimes it hangs on a call to ReceiveAsync (); and the conditions of failure are odd.

Pay attention . I am wondering why this freezes. I already found another way to make the application I'm working on, but it may not be as neat or extensible as I don't use the TPL data stream since I obviously don't understand how it works.

Further note . Using the key here is what I'm doing TryReceiveAll (), and then wait for ReceiveAsync () if that fails. This is a common template that receives the package data, and I want to process the data until the package is loaded. For this reason, I don’t want to just go in cycles in ReceiveAsync (), and therefore simple connection to ActionBlock or TransformBlock will not work. If I remove TryReceiveAll (), my version works as expected; although, as noted by other comments, the behavior is different from different people, so this may be a coincidence.

Here is a bad example ... drop it in a console application with a link to System.Threading.Tasks.Dataflow.dll and using:

using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

Bad example:

class Program
{
    static void Main(string[] args)
    {
        var context = new CancellationTokenSource();
        var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
        var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);

        // shove 10 things onto the buffer
        for(int i = 0; i < 10; i++)
        {
            // shove something on the buffer every second
            buffer.Post(i);
            Thread.Sleep(1000);
        }
        context.Cancel();
    }

    // 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
    // 2. Then we should see nothing for a second
    // 3. Then we immediately process the next element from the await ReceiveAsync.
    // 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
    static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
    {
        try
        {
            while (!token.IsCancellationRequested)
            {
                Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
                IList<int> elements;
                if (!buffer.TryReceiveAll(out elements))
                {
                    try
                    {
                        var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
                        elements = new List<int> { element };
                    }
                    catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
                }
                if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
            }
        }
        catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
    }
}

:

11:27: This Breaks it...
Elements: 0
11:27: This Breaks it...
Failed to get anything on await...
11:27: This Breaks it...
Elements: 1, 2, 3, 4, 5

... Etc

But if I changed the log line

Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");

to

Console.WriteLine("This Works...");

The output comes out as expected:

This Works...
Elements: 0
This Works...
Elements: 1
This Works...
Elements: 2
This Works...

... .

?

+4

Source: https://habr.com/ru/post/1687546/


All Articles