How to take an element of two priority BlockingCollections into the first collection?

I have two BlockingCollection<T> objects, collection1 and collection2 . I want to consume items from these collections, giving priority to the items in collection1 . That is, if there are elements in both collections, I want to take elements from collection1 . If none of them have elements, I want to wait until the element is available.

I have the following code:

 public static T Take<T>( BlockingCollection<T> collection1, BlockingCollection<T> collection2) where T:class { if (collection1.TryTake(out var item1)) { return item1; } T item2; try { BlockingCollection<T>.TakeFromAny( new[] { collection1, collection2 }, out item2); } catch (ArgumentException) { return null; } return item2; } 

This code is expected to return null when CompleteAdding is called in both collections, and both are empty.

My main problem with this code is that the documentation for the TakeFromAny method indicates that TakeFromAny throw an ArgumentException if CompleteAdding was called in the “collection”:

ArgumentException

The collection argument is an array of length 0 or contains a null element, or the CompleteAdding () method was called in the collection.

Does it throw if CompleteAdding was called in any collection? or both collections?

What if CompleteAdding was called and the collection still has some elements that it throws?

I need a reliable way to do this.

In this code, I try to get from collection1 first, because the TakeFromAny documentation TakeFromAny not give any guarantees regarding the collection order from which an element can be taken if two collections have elements.

It also means that if both collections are empty and then they will receive the items at the same time later, then I can get the item from collection2 , that's fine.

EDIT:

The reason I add items to two collections (and not just one collection) is because the first collection does not have a top border, and the second collection does.

More for those who are interested in why I need it:

I use this in an open source project called ProcessuralDataflow. See here https://github.com/ymassad/ProceduralDataflow for more details.

Each node processing in a data flow system has two collections, one collection will contain elements coming in for the first time (so I need to slow down the producer if necessary), and the other collection will contain elements coming in for the second (or third, ..) times (as a result of a loop in the data stream).

The reason one collection does not have an upper bound is because I do not want to have deadlocks as a result of loops in the data stream.

+5
source share
1 answer

First, short answers to your specific questions.

Does it throw if CompleteAdding was called in any collection? or both collections?

Both (all) - but only if there are no items available in any collection.

What if CompleteAdding was called and the collection still has some elements that it throws?

No. If the collection has an available item, it will be removed from the collection and returned to the caller.

Conclusion

The documentation seems to be unclear. Part

or CompleteAdding() is called in the collection

should have been worded differently - something like

whether or not there is an available item in any for collections, and CompleteAdding() was called in all collections

Justification

Well, I know that implementation is not good practice, but when the documentation is unclear, implementation is the only reliable and official source that I can think of. Therefore, using a reference source , both TakeFromAny and TryTakeFromAny call the private method TryTakeFromAnyCore . It starts with the following:

 ValidateCollectionsArray(collections, false); 

false here there is a bool argument called isAddOperation , and is used inside the ValidateCollectionsArray as follows:

 if (isAddOperation && collections[i].IsAddingCompleted) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); } 

which is one of the possible places throwing an ArgumentException for collections with called CompleteAdding() . And, as we see, this is not so (question No. 1).

Then the implementation continues with the following “quick path”:

 //try the fast path first for (int i = 0; i < collections.Length; i++) { // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item)) return i; } 

This confirms the answer to question No. 2.

Finally, if there is no element available in any of the sets, the implementation takes a “slow path” by calling another private TryTakeFromAnyCoreSlow method, with the following comment TryTakeFromAnyCoreSlow a substantial explanation of the implemented behavior:

 //Loop until one of these conditions is met: // 1- The operation is succeeded // 2- The timeout expired for try* versions // 3- The external token is cancelled, throw // 4- The operation is TryTake and all collections are marked as completed, return false // 5- The operation is Take and all collection are marked as completed, throw 

The answer to both of our questions is in case No. 1 and case No. 5 (pay attention to the word all ). Btw, it also shows the only difference between TakeFromAny and TryTakeFromAny - case # 4 vs # 5, i.e. throw vs return -1 .

+3
source

Source: https://habr.com/ru/post/1271352/


All Articles