The reason your pipeline freezes is because both BufferBlock
and TransformBlock
do not seem to fill until they empty the elements (I assume the desired behavior of IPropagatorBlock
, although I did not find on it documentation).
This can be verified with a more minimal example:
var bufferBlock = new BufferBlock<int>(); bufferBlock.Post(0); bufferBlock.Complete(); bufferBlock.Completion.Wait();
This blocks indefinitely unless you add bufferBlock.Receive();
to completion.
If you remove elements from your pipeline before blocking your TryReceiveAll
code TryReceiveAll
, connecting another ActionBlock
to the pipeline, converting your TransformBlock
to ActionBlock
or in any other way, it will no longer block.
As for your specific solution, it seems that you do not need BufferBlock
or TransformBlock
, since blocks have an input queue for themselves, and you are not using the return value of TransformBlock
. This can only be achieved with ActionBlock
:
var block = new ActionBlock<int>( i => { Console.WriteLine("Calculating {0}²", i); Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2)); }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8}); foreach (var number in Enumerable.Range(1, 1000)) { block.Post(number); } block.Complete(); block.Completion.Wait();
source share