I accepted my parallel / consumer code implementation based on this question
class ParallelConsumer<T> : IDisposable { private readonly int _maxParallel; private readonly Action<T> _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); private Task _task; public ParallelConsumer(int maxParallel, Action<T> action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () => { Parallel.ForEach( _entries.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, (item, loopState) => { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); }, _tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } }
And here is the test code
class Program { static void Main(string[] args) { TestRepeatedEnqueue(100, 1); } private static void TestRepeatedEnqueue(int itemCount, int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer<int>(parallelCount, (i) => { flags[i] = true; } ); consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } Thread.Sleep(1000); Debug.Assert(flags.All(b => b == true)); } }
The test always fails - it always gets stuck around the 93rd item out of 100 tested. Any idea what part of my code caused this problem and how to fix it?
source share