Why does BufferBlock <T> .ReceiveAsync () freeze when data is available?
I am new to TPL data stream.
I am trying to create a throttle asynchronous update for a fairly fast stream flow. BufferBlock seemed a good match for this with the idea that I could call ReceiveAll () to grab everything from the buffer, and in cases where I can't wait for anything in ReceiveAsync () to get the next item as it arrives.
But sometimes it hangs on a call to ReceiveAsync (); and the conditions of failure are odd.
Pay attention . I am wondering why this freezes. I already found another way to make the application I'm working on, but it may not be as neat or extensible as I don't use the TPL data stream since I obviously don't understand how it works.
Further note . Using the key here is what I'm doing TryReceiveAll (), and then wait for ReceiveAsync () if that fails. This is a common template that receives the package data, and I want to process the data until the package is loaded. For this reason, I don’t want to just go in cycles in ReceiveAsync (), and therefore simple connection to ActionBlock or TransformBlock will not work. If I remove TryReceiveAll (), my version works as expected; although, as noted by other comments, the behavior is different from different people, so this may be a coincidence.
Here is a bad example ... drop it in a console application with a link to System.Threading.Tasks.Dataflow.dll and using:
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
Bad example:
class Program
{
static void Main(string[] args)
{
var context = new CancellationTokenSource();
var buffer = new BufferBlock<int>(new DataflowBlockOptions { CancellationToken = context.Token });
var task = Task.Run(() =>ProcessBuffer(buffer, context.Token), context.Token);
// shove 10 things onto the buffer
for(int i = 0; i < 10; i++)
{
// shove something on the buffer every second
buffer.Post(i);
Thread.Sleep(1000);
}
context.Cancel();
}
// 1. We expect to see something hit the buffer and go immediately through on the TryReceiveAll
// 2. Then we should see nothing for a second
// 3. Then we immediately process the next element from the await ReceiveAsync.
// 4. We should then repeat 2 & 3 'till i == 10 as there will be nothign for TryReceiveAll.
static async Task ProcessBuffer(BufferBlock<int> buffer, CancellationToken token)
{
try
{
while (!token.IsCancellationRequested)
{
Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
IList<int> elements;
if (!buffer.TryReceiveAll(out elements))
{
try
{
var element = await buffer.ReceiveAsync(TimeSpan.FromMilliseconds(5000), token);
elements = new List<int> { element };
}
catch (TimeoutException) { Console.WriteLine("Failed to get anything on await..."); }
}
if (elements != null) Console.WriteLine("Elements: " + string.Join(", ", elements));
}
}
catch (Exception e) { Console.WriteLine("Exception in thread: " + e.ToString()); }
}
}
:
11:27: This Breaks it...
Elements: 0
11:27: This Breaks it...
Failed to get anything on await...
11:27: This Breaks it...
Elements: 1, 2, 3, 4, 5
... Etc
But if I changed the log line
Console.WriteLine(DateTime.Now.ToShortTimeString() + ": This Breaks it...");
to
Console.WriteLine("This Works...");
The output comes out as expected:
This Works...
Elements: 0
This Works...
Elements: 1
This Works...
Elements: 2
This Works...
... .
?