How to wrap a ConcurrentDictionary in a BlockingCollection?

I am trying to implement a ConcurrentDictionary by wrapping it in a BlockingCollection, but it doesn't seem to be successful.

I understand that one variable declaration works with BlockingCollection, for example ConcurrentBag<T> , ConcurrentQueue<T> , etc.

So, to create a ConcurrentBag wrapped in a BlockingCollection, I declare and instantiate it like this:

 BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>()); 

but how to do it for ConcurrentDictionary? I need the BlockingCollection blocking functionality on both the producer and consumer side.

+6
source share
2 answers

Maybe you need a parallel blockingCollection dictionary

  ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>(); int maxBoxes = 5; CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); CancellationToken cancelationToken = cancelationTokenSource.Token; Random rnd = new Random(); // Producer Task.Factory.StartNew(() => { while (true) { int index = rnd.Next(0, maxBoxes); // put the letter in the mailbox 'index' var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); box.Add("some message " + index, cancelationToken); Console.WriteLine("Produced a letter to put in box " + index); // Wait simulating a heavy production item. Thread.Sleep(1000); } }); // Consumer 1 Task.Factory.StartNew(() => { while (true) { int index = rnd.Next(0, maxBoxes); // get the letter in the mailbox 'index' var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); var message = box.Take(cancelationToken); Console.WriteLine("Consumed 1: " + message); // consume a item cost less than produce it: Thread.Sleep(50); } }); // Consumer 2 Task.Factory.StartNew(() => { while (true) { int index = rnd.Next(0, maxBoxes); // get the letter in the mailbox 'index' var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); var message = box.Take(cancelationToken); Console.WriteLine("Consumed 2: " + message); // consume a item cost less than produce it: Thread.Sleep(50); } }); Console.ReadLine(); cancelationTokenSource.Cancel(); 

Thus, a consumer who is expecting something in mailbox 5 will wait until the product sends a letter to mailbox 5.

+4
source

You will need to write your own adapter class - something like:

  public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>> { private ConcurrentDictionary<TKey, TValue> dictionary; public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() { return dictionary.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public void CopyTo(Array array, int index) { throw new NotImplementedException(); } public int Count { get { return dictionary.Count; } } public object SyncRoot { get { return this; } } public bool IsSynchronized { get { return true; } } public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) { throw new NotImplementedException(); } public bool TryAdd(KeyValuePair<TKey, TValue> item) { return dictionary.TryAdd(item.Key, item.Value); } public bool TryTake(out KeyValuePair<TKey, TValue> item) { item = dictionary.FirstOrDefault(); TValue value; return dictionary.TryRemove(item.Key, out value); } public KeyValuePair<TKey, TValue>[] ToArray() { throw new NotImplementedException(); } } 
+1
source

Source: https://habr.com/ru/post/916550/


All Articles