Coordination of multiple concurrent queues

I currently have a parallel queue implementation that uses BlockingQueue as a data store. Now I need to enter an object of the second type with a higher priority, which will lead me to the hunger / priority queue for the original queue. Therefore, we work with objects of type A and type B, which are produced from several threads. Any type B objects must be processed to those of type A, but this FIFO order is not supported. Therefore, if {1A, 1B, 2A, 3A, 2B} is inserted, the order should be {1B, 2B, 1A, 2A, 3A}

I tried one PriorityBlockingQueue to push type B to the front, but I could not support the FIFO requirement (there is no natural order between elements of the same type).

My next thought is to use two parallel queues. I am looking for common errors or considerations when coordinating access between two queues. Ideally, I would like to do something like this:

  public void add(A a) { aQueue.add(a); } public void add(B b) { bQueue.add(b); } private void consume() { if(!bQueue.isEmpty()) process(bQueue.poll()); else if(!aQueue.isEmpty()) process(aQueue.poll()); } 

Do I need any synchronization or blocking if both queues are ConcurrentLinkedQueue (or insert a more appropriate structure here)? Note. I have many manufacturers, but only one consumer (single-threaded ThreadPoolExecutor ).

EDIT: If after checking isEmpty () B appears, normally process A and process it the next time consumption () is called.

+4
source share
3 answers

I'm not sure if I understood your situation correctly, but I think that this can be solved with a single turn.

You said that your objects (in the queue) should be comparable in natural order and type. If there is no natural order, just create a sequence generator ( AtomicLong ) that will give your objects a unique, always increasing queue identifier. Getting data from AtomicLong will not take time if you are not in the world of nanoseconds.

So your Comparator.compare should look like this:

1) check the type of object. If it is different ( A VS B ), return 1 / -1. Otherwise, see below.
2) check the identifier. This is guaranteed to be different.

If you cannot modify objects (A and B), you can put them in another object that will contain this identifier.

+3
source

According to your requirements, your method should work fine.

I would make the following change for your code. This blocks your getNextItem () until one of the queues returns an object to you.

 private Object block = new Object(); public void add(A a) { synchronized( block ) { aQueue.add( a ); block.notifyAll(); } } public void add(B b) { synchronized( block ) { bQueue.add( b ); block.notifyAll(); } } private Object consume() { Object value = null synchroinzed( block ) { while ( return == null ) { value = bQueue.poll(); if ( value == null ) value = aQueue.poll(); if ( value == null ) block.wait(); } } return value; } 
+3
source

You need to synchronize the getQueueItem () method.

With A and B implementing the QueueItem interface.

  public void add(A a) { aQueue.add(a); } public void add(B b) { bQueue.add(b); } private void consume() { process(getNextItem()); } private QueueItem getNextItem() { synchronized(bQueue) { if(!bQueue.isEmpty()) return bQueue.poll(); return aQueue.poll(); } } 
+1
source

Source: https://habr.com/ru/post/1334532/


All Articles