Testing PriorityBlockingQueue in ThreadPoolExecutor

I understood my ThreadPoolExecutor with PriorityBlockingQueue, as in this example: stack overflow

and wrote a test:

PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(16); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("1"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 1); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("3"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 3); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("2"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 2); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("5"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 5); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("4"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 4); executorService.shutdown(); try { executorService.awaitTermination(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } 

But in the end, I do not get 1 2 3 4 5, I get a random order of these numbers. Is there a problem with the test or something else? And if at first, how can it be tested correctly?

+4
java multithreading threadpool priority-queue threadpoolexecutor
May 30 '13 at 10:37
source share
2 answers

Priority is only taken into account if the pool is fully occupied and you are sending several new tasks. If you define your pool with only one thread, you should get the expected result. In your example, all tasks are performed simultaneously, and one ends first, somewhat randomly.

By the way, the related implementation has a problem and throws an exception if your queue is full and you send new tasks.

Below is a working example of what you are trying to achieve (I simplified newTaskFor simplified way, just to make it work - you can improve this part).

It prints: 1 2 3 4 5 .

 public class Test { public static void main(String[] args) { PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1); executorService.submit(getRunnable("1"), 1); executorService.submit(getRunnable("3"), 3); executorService.submit(getRunnable("2"), 2); executorService.submit(getRunnable("5"), 5); executorService.submit(getRunnable("4"), 4); executorService.shutdown(); try { executorService.awaitTermination(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } public static Runnable getRunnable(final String id) { return new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(id); } catch (InterruptedException e) { e.printStackTrace(); } } }; } static class PriorityExecutor extends ThreadPoolExecutor { public PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } //Utitlity method to create thread pool easily public static ExecutorService newFixedThreadPool(int nThreads) { return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>()); } //Submit with New comparable task public Future<?> submit(Runnable task, int priority) { return super.submit(new ComparableFutureTask(task, null, priority)); } //execute with New comparable task public void execute(Runnable command, int priority) { super.execute(new ComparableFutureTask(command, null, priority)); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return (RunnableFuture<T>) callable; } @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return (RunnableFuture<T>) runnable; } } static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> { volatile int priority = 0; public ComparableFutureTask(Runnable runnable, T result, int priority) { super(runnable, result); this.priority = priority; } public ComparableFutureTask(Callable<T> callable, int priority) { super(callable); this.priority = priority; } @Override public int compareTo(ComparableFutureTask<T> o) { return Integer.valueOf(priority).compareTo(o.priority); } } } 
+12
May 30 '13 at 11:03
source share

You have 16 threads and only 5 tasks, that is, all of them are executed at the same time, and priority does not actually matter.

Priority matters only when pending tasks.

To show this, if you set only one thread for your example, you will get the expected result.

+1
May 30 '13 at 10:52
source share



All Articles