How to implement round order for PriorityBlockingQueue?

I have one PriorityBlockingQueue. One thread consumes one message at a time from this queue and processes it. Several additional threads insert messages into the queue. The producer threads assign an integral priority to each message sent. Static is AtomicLongused to assign each message a unique, monotonically increasing identifier. ComparatorThe queue orders messages at this first priority, and then messages with equal priority are sorted by identifier (first the lowest identifier).

Problem: Sometimes one vendor sends a large number of messages. This leads other manufacturers to process their messages. What I would like to do is that the consumer-turnaround between producers for messages with equal priority (while still processing the same priority messages from the same manufacturer in the sending order). But I can’t figure out how to write Comparatorfor this.

Another alternative that I reviewed has a separate queue for each manufacturer. However, I don’t think this might work, because I don’t know how a single thread can wait for multiple queues.

+4
source share
3 answers

I feel that it’s easier to implement this using one Queuefor each manufacturer. One thread cannot wait for several Queues, but you can combine everything Queueinto one helper class so that it does not need.

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.concurrent.GuardedBy;

public class RoundRobin<P, E> {
    private final Lock lock = new ReentrantLock();
    private final Condition added = lock.newCondition();

    @GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();

    public boolean add(P producer, E item) {
        lock.lock();
        try {
            if (!queues.containsKey(producer)) {
                queues.put(producer, new PriorityBlockingQueue<>());
            }

            added.signalAll();
            return queues.get(producer).add(item);
        } finally {
            lock.unlock();
        }
    }

    public Iterator<E> roundRobinIterator() {
        return new Iterator<E>() {
            private Iterator<? extends Queue<E>> i = null;
            private boolean singlePass = true;

            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public E next() {
                lock.lock();
                try {
                    while (true) {
                        if (i == null || !i.hasNext()) {
                            i = queues.values().iterator();
                            singlePass = true;
                        }

                        while (i.hasNext()) {
                            Queue<E> q = i.next();
                            if (!q.isEmpty()) {
                                if (singlePass) {
                                    // copy the iterator to prevent
                                    // ConcurrentModificationExceptions
                                    singlePass = false;
                                    i = copy(i);
                                }
                                return q.poll();
                            }
                        }

                        if (singlePass) {
                            // If singlePass is true then we just checked every
                            // queue and they were all empty.
                            // Wait for another element to be added.
                            added.await();
                        }
                    }
                } catch (InterruptedException e) {
                    throw new NoSuchElementException(e.getMessage());
                } finally {
                    lock.unlock();
                }
            }

            private <T> Iterator<? extends T> copy(Iterator<? extends T> i) {
                List<T> copy = new ArrayList<>();
                while (i.hasNext()) {
                    copy.add(i.next());
                }
                return copy.iterator();
            }
        };
    }
}
+3
source

I think I would do something like this:

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

public class RRQueue<M> {
    private final ThreadLocal<Queue<M>> threadQueue = new ThreadLocal<>();
    private final List<Queue<M>> queues;
    private int current = 0;

    public RRQueue() {
        this.queues = new ArrayList<>();
    }

    public synchronized void add(M msg) {
        Queue<M> queue = threadQueue.get();
        if (queue == null) {
            queue = new LinkedList<>(); // or whatever
            queues.add(queue);
            threadQueue.set(queue);
        }
        queue.add(msg);
        notify();
    }

    public synchronized M get() throws InterruptedException {
        while (true) {
            for (int i = 0; i < queues.size(); ++i) {
                Queue<M> queue = queues.get(current);
                current = (current+1)%queues.size();
                if (!queue.isEmpty()) {
                    return queue.remove();
                }
            }
            wait();
        }
    }
}
+2
source

, . N , N - , . . , , , , Nx-1 x.

0

Source: https://habr.com/ru/post/1569665/


All Articles