Deadlocks when using BlockingCollection <T> and TPL data blocks together

I wrote a test sample that replicates the problem. This is not my actual code, I tried to write a little reprogramming. If you increase the bandwidth to the number of iterations without effectively giving it any restrictions, it does not come to a standstill, and if you set max parallelism to a small number like 1, it will not block.

Again, I know that the code below is small, but the code I actually found was much larger and hard to understand. Basically, there was a pool of blocking objects to connect to the remote resource, and several blocks in the stream used the connection.

Any ideas on how to solve this? At first glance, this is a problem with the data stream. When I break down to take a look at the threads, I see that many threads are blocked when adding and 0 threads are blocked when accepting. There are several items in the addBlocks outgoing message queue that are not yet propagating to the capture block, so it is stuck or stuck.

    var blockingCollection = new BlockingCollection<int>(10000);

    var takeBlock = new ActionBlock<int>((i) =>
    {
        int j = blockingCollection.Take();

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20,
              SingleProducerConstrained = true
           });

    var addBlock = new TransformBlock<int, int>((i) => 
    {
        blockingCollection.Add(i);
        return i;

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20
           });

    addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
          {
             PropagateCompletion = true
          });

    for (int i = 0; i < 100000; i++)
    {
        addBlock.Post(i);
    }

    addBlock.Complete();
    await addBlock.Completion;
    await takeBlock.Completion;
+4
source share
1 answer

TPL Dataflow was not intended to be used with code that blocks a lot, and I think this problem stems from this.

, , , . , Dataflow BufferBlock. :

var bufferBlock = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10000 });

var takeBlock = new ActionBlock<int>(
    async i =>
    {
        int j = await bufferBlock.ReceiveAsync();
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20,
        SingleProducerConstrained = true
    });

var addBlock = new TransformBlock<int, int>(
    async i =>
    {
        await bufferBlock.SendAsync(i);
        return i;
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20
    });

. , , .

+3

Source: https://habr.com/ru/post/1542222/


All Articles