Cancel BlockingCollection.GetConsumingEnumerable () and handle the remaining

I have one job of creating a process, and a second process with BlockingCollection<> , which consumes this job. When I close my program, I need my customer to stop working, but I still need to quickly register work that was delayed but was not used.

Right now, my consumer is spawning a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable()) . When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding() . I find that my consumer continues to process everything in line.

Inclusion of issues in the problem tells me that I need to use the CancellationToken . So I tried this:

 private void Process () { // This method runs in a separate thread try { foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) { // Consume } } catch (OperationCancelledException) { foreach (*work* in BlockingCollection.GetConsumingEnumerable()) { // quickly log } } } 

My producer:

 private CancellationTokenSource StopFlag = new CancellationTokenSource (); MyConsumer.CancellationToken = StopFlag.Token; // Make the consumer spawn it consuming thread... StopFlag.Cancel (); MyConsumer.BlockingCollection.CompleteAdding (); 

When I try to do this, I see no indication that an OperationCancelledException has occurred.

This question is trying to explain the use of the cancellation token, but it seems like it is not using it correctly. (Argument: if it works, then it is “fairly correct.”) And this question will be an exact duplicate of my question, but without an example. (Same here .)

So, we repeat: how to use CancellationToken on BlockingCollection.GetConsumingEnumerable() correctly, with the caveat that I need to process the remaining elements in the queue after it is canceled using another method?

(I think my problem is focusing on the correct use of the CancellationToken. None of my tests indicate that the process is actually canceled. ( StopFlag.IsCancellationRequested always false .))

+6
source share
2 answers

My problem was how I tried to cancel the operation. Instead of my producer owning the CancellationTokenSource website, I put all this on the consumer.

 public class cProducer { private cConsumer myConsumer = new cConsumer (); public void onStart () { myConsumer.OnStart (); } public void onStop () { myConsumer.OnStop (); } public void OnOrderReceived (cOrder newOrder) { myConsumer.orderQueue.Add (cOrder); } } public class cConsumer { private CancellationTokenSource stopFlag; public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> (); private Task processingTask; public void OnStart () { stopFlag = new CancellationTokenSource (); processingTask = Task.Factory.StartNew (() => Process ()); } public void OnStop () { stopFlag.Cancel (); orderQueue.CompleteAdding (); processingTask.Wait (); } private void Process () { try { foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) { // process } } catch (OperationCanceledException) { foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) { // log it } } } } 
+3
source

When you go to the CancellationToken in GetConsumingEnumerable , it will not throw an exception required for cancellation, it will just stop splashing the elements. Instead of catching an exception, just check the token:

 foreach (var item in BlockingCollection. GetConsumingEnumerable(CancellationToken)) { //consume item } if (CancellationToken.IsCancellationRequested) foreach (var item in BlockingCollection) { //log item } 

Also note that if revocation is requested, and it is possible that CompletedAdding not called, you just need to GetConsumingEnumerable over the collection and not call GetConsumingEnumerable . If you know that the producer will complete the addition when the operation is canceled, this is not a problem.

+5
source

Source: https://habr.com/ru/post/957850/


All Articles