Producer / Hybrid Consumer in C # Using Frame 4.0 Classes and Collection Lock

I have a situation where I have a producer / consumer scenario. The manufacturer never stops, which means that even if there is a time when there are no elements in BC, additional elements can be added later.

Moving from the .NET Framework 3.5 to 4.0, I decided to use it BlockingCollectionas a parallel queue between the consumer and the manufacturer. I even added some parallel extensions, so I could use BC with Parallel.ForEach.

The problem is that in the consumer stream I need a hybrid model:

  • I always check BC to process any item that arrived with Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
  • Inside this, foreachI perform all tasks that are independent of each other.
  • That is the problem. After parallelizing the previous tasks, I need to manage their results in the same FIFO order in which they were in BC. Processing of these results should be carried out in a synchronization stream.

The following is a small example in pseudo code:

manufacturer:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}

Consumer

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Paralell ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

                }
            });
}

Obviously, this work does not work, because the block is .GetConsumingEnumerable()blocked until there is no other element in BC. I would like to do this with tasks and just run 4 or 5 tasks in one batch, but:

  • , , BC ( , . - BC, 4 TryTake , , , , , BC , , , BC, 4 )?
  • , Parallel.For?
  • FIFO, BC?
  • - concurrency, ?
  • , , - StackOverflow, , - , , .
+4
1

, , , ConcurrentBag :

while (!_cancelTasks)
{
   //BlockingCollections with Paralell ForEach
   var bc = _concurrentCollection;
   var q = new ConcurrentBag<ScannedPage>();
   Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
   {
      ScannedPage currentPage = item;
      q.Add(item);
      // process a batch of images from the bc and check if an image has a valid barcode. T
   });
 //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.


  //process items in your list here by sorting using some sequence key
  var items = q.OrderBy( o=> o.SeqNbr).ToList();
  foreach( var item in items){
     ...
  }
}

, BC, nbr ScannedPage, , .

:

ScannedPage:

public static int _counter;  //public because this is just an example but it would work.

nbr :

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
    lock( this){   //to single thread this process.. not necessary if it already single threaded of course.
    System.Threading.Interlocked.Increment( ref ScannedPage._counter);
    scannedPage.SeqNbr = ScannedPage._counter;
    ...
    }
}
+2

Source: https://habr.com/ru/post/1589077/


All Articles