Parallel .ForEach stalled when merged with BlockingCollection

I accepted my parallel / consumer code implementation based on this question

class ParallelConsumer<T> : IDisposable { private readonly int _maxParallel; private readonly Action<T> _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); private Task _task; public ParallelConsumer(int maxParallel, Action<T> action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () => { Parallel.ForEach( _entries.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, (item, loopState) => { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); }, _tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } } 

And here is the test code

 class Program { static void Main(string[] args) { TestRepeatedEnqueue(100, 1); } private static void TestRepeatedEnqueue(int itemCount, int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer<int>(parallelCount, (i) => { flags[i] = true; } ); consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } Thread.Sleep(1000); Debug.Assert(flags.All(b => b == true)); } } 

The test always fails - it always gets stuck around the 93rd item out of 100 tested. Any idea what part of my code caused this problem and how to fix it?

+6
source share
2 answers

You cannot use Parallel.Foreach() with BlockingCollection.GetConsumingEnumerable() as you discovered.

See this blog post for clarification:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

This blog also provides source code for a method called GetConsumingPartitioner() , which you can use to solve the problem.

Excerpt from the blog:

BlockingCollections The GetConsumingEnumerable function uses the internal BlockingCollections synchronization, which already supports several consumers at the same time, but ForEach does not know this, and its enumerated partition logic should also block access to the enumerated.

Thus, there is more synchronization than it actually is, which leads to a potentially inattentive result.

[Also] the partitioning algorithm used by default, both Parallel.ForEach and PLINQ, use chunking to minimize synchronization costs: instead of blocking once per element, it will block, grab a group of elements (piece), and then release the lock.

Although this project can help with overall throughput, for scenarios that are more focused on low latency, this blocking can be prohibitive.

+8
source

The reason for the failure is due to the following reason described here.

Default Split Algorithm Used by Default Parallel .ForEach and PLINQ use chunking to minimize synchronization costs: instead of locking once per element, it will take a lock, take a group of elements (piece), and then release the lock.

To make it work, you can add a method to your ParallelConsumer<T> class to indicate that the addition is complete, as shown below

  public void StopAdding() { _entries.CompleteAdding(); } 

And now call this method after for loop as below

  consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } consumer.StopAdding(); 

Otherwise, Parallel.ForEach() will wait for the threshold to be reached to capture the piece and begin processing.

+2
source

Source: https://habr.com/ru/post/950159/


All Articles