Is there something like an asynchronous BlockingCollection <T>?

I would like to await the result of BlockingCollection<T>.Take() asynchronously, so I am not blocking the stream. Looking for something like this:

 var item = await blockingCollection.TakeAsync(); 

I know I can do this:

 var item = await Task.Run(() => blockingCollection.Take()); 

but this view kills the whole idea, because instead another thread is blocked ( ThreadPool ).

Is there an alternative?

+70
collections c # task-parallel-library async-await
Jan 20 '14 at 2:30
source share
4 answers

There are four options that I know of.

The first is Channels , which provide a thread-safe queue that supports asynchronous Read and Write operations. Channels are highly optimized and, if necessary, support the removal of certain elements when the threshold is reached.

Next up is the BufferBlock<T> from the TPL data stream . If you have only one consumer, you can use OutputAvailableAsync or ReceiveAsync or just associate it with an ActionBlock<T> . For more information, see My Blog .

The last two types that I created are available in my AsyncEx library .

AsyncCollection<T> is async almost equivalent to BlockingCollection<T> , able to wrap a parallel collection of producers / consumers, such as ConcurrentQueue<T> or ConcurrentBag<T> . You can use TakeAsync to asynchronously consume items from the collection. For more information, see My Blog .

AsyncProducerConsumerQueue<T> is the more portable async -compatible producer / consumer queue. You can use DequeueAsync to asynchronously consume items from the queue. For more information, see My Blog .

The last three of these alternatives allow synchronous and asynchronous allocation and receipt.

+73
Jan 20 '14 at 3:45
source share

... or you can do this:

 using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } } 

Simple, fully functional asynchronous FIFO queue.

Note: SemaphoreSlim.WaitAsync was added in .NET 4.5 before that, it was not so simple.

+16
Feb 24 '18 at 14:15
source share

If you do not mind a little hack, try these extensions.

 public static async Task AddAsync<TEntity>( this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt) { while (true) { try { if (Bc.TryAdd(item, 0, abortCt)) return; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } public static async Task<TEntity> TakeAsync<TEntity>( this BlockingCollection<TEntity> Bc, CancellationToken abortCt) { while (true) { try { TEntity item; if (Bc.TryTake(out item, 0, abortCt)) return item; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } 
+2
Aug 29 '19 at 10:44 on
source share

Here is a very basic implementation of BlockingCollection that supports waiting with lots of missing features. It uses the AsyncEnumerable library, which makes asynchronous enumeration possible for C # versions older than 8.0.

 public class AsyncBlockingCollection<T> { // Missing features: cancellation, boundedCapacity, TakeAsync private Queue<T> _queue = new Queue<T>(); private SemaphoreSlim _semaphore = new SemaphoreSlim(0); private int _consumersCount = 0; private bool _isAddingCompleted; public void Add(T item) { lock (_queue) { if (_isAddingCompleted) throw new InvalidOperationException(); _queue.Enqueue(item); } _semaphore.Release(); } public void CompleteAdding() { lock (_queue) { if (_isAddingCompleted) return; _isAddingCompleted = true; if (_consumersCount > 0) _semaphore.Release(_consumersCount); } } public IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; return new AsyncEnumerable<T>(async yield => { while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) await yield.ReturnAsync(item); } }); } } 

Usage example:

 var abc = new AsyncBlockingCollection<int>(); var producer = Task.Run(async () => { for (int i = 1; i <= 10; i++) { await Task.Delay(100); abc.Add(i); } abc.CompleteAdding(); }); var consumer = Task.Run(async () => { await abc.GetConsumingEnumerable().ForEachAsync(async item => { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); }); }); await Task.WhenAll(producer, consumer); 

Exit:

1 2 3 4 5 6 7 8 9 10




Update: With the release of C # 8, asynchronous enumeration has become a built-in language feature. Required classes ( IAsyncEnumerable , IAsyncEnumerator ) are built into .NET Core 3.0 and are offered as a package for the .NET Framework 4.6. 1+ ( Microsoft.Bcl.AsyncInterfaces ).

Here is an alternative implementation of GetConsumingEnumerable with the new C # 8 syntax:

 public async IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) yield return item; } } 

Pay attention to the coexistence of await and yield in one method.

Usage example (C # 8):

 var consumer = Task.Run(async () => { await foreach (var item in abc.GetConsumingEnumerable()) { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); } }); 

Pay attention to await before foreach .

+1
Apr 30 '19 at 5:43
source share



All Articles