Is there an ExecutorService service that is suitable for a huge number of very short tasks? I foresee what internally tries waiting before switching to synchronized waiting. Maintaining the order of the tasks is not important, but it should be possible to ensure memory consistency (all tasks are performed before the main thread regains control).
Below is a test consisting of 100,000 tasks, each of which generates 100 double per line. It takes the size of the thread pool as a command line parameter and always checks the serial and parallel versions. (If no command line argument is given, only the serial version is tested.) The parallel version uses a fixed-size thread pool, task allocation is not even part of the time dimension. However, the parallel version is never faster than the serial version, I tried up to 80 threads (on a machine with 40 hyper-threading cores). Why?
import java.util.ArrayList; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorPerfTest { public static final int TASKS = 100000; public static final int SUBTASKS = 100; static final ThreadLocal<Random> R = new ThreadLocal<Random>() { @Override protected synchronized Random initialValue() { return new Random(); } }; public class SeqTest implements Runnable { @Override public void run() { Random r = R.get(); for (int i = 0; i < TASKS; i++) for (int j = 0; j < SUBTASKS; j++) r.nextDouble(); } } public class ExecutorTest implements Runnable { private final class RandomGenerating implements Callable<Double> { @Override public Double call() { double d = 0; Random r = R.get(); for (int j = 0; j < SUBTASKS; j++) d = r.nextDouble(); return d; } } private final ExecutorService threadPool; private ArrayList<Callable<Double>> tasks = new ArrayList<Callable<Double>>(TASKS); public ExecutorTest(int nThreads) { threadPool = Executors.newFixedThreadPool(nThreads); for (int i = 0; i < TASKS; i++) tasks.add(new RandomGenerating()); } public void run() { try { threadPool.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } public static void main(String[] args) { ExecutorPerfTest executorPerfTest = new ExecutorPerfTest(); if (args.length > 0) executorPerfTest.start(new String[]{}); executorPerfTest.start(args); } private void start(String[] args) { final Runnable r; if (args.length == 0) { r = new SeqTest(); } else { final int nThreads = Integer.parseInt(args[0]); r = new ExecutorTest(nThreads); } System.out.printf("Starting\n"); long t = System.nanoTime(); r.run(); long dt = System.nanoTime() - t; System.out.printf("Time: %.6fms\n", 1e-6 * dt); } }
source share