Java: combined multi-threaded / single-threaded task queue

I like the ExecutorService series of classes / interfaces. I do not need to worry about threads; I take an instance ExecutorServiceand use it to schedule tasks, and if I want to use an 8-thread or 16-thread pool, well, great, I don’t have to worry about this, it just happens depending on how the ExecutorService is configured. Hooray!

But what should I do if some of my tasks need to be performed in sequential order? Ideally, I would ask ExecutorService to allow me to schedule these tasks in a single thread, but there seemed to be no means to do this.

edit: Tasks are unknown in advance, this is an unlimited series of tasks that are unpredictably generated by various kinds of events (think of a random / unknown arrival process: for example, clicks from a Geiger counter or a keypress event).

+3
source share
4 answers

You can write an implementation Runnablethat performs some tasks and performs them sequentially.

Sort of:

public class SerialRunner implements Runnable {
    private List<Runnable> tasks;

    public SerialRunner(List<Runnable> tasks) {
        this.tasks = tasks;
    }

    public void run() {
        for (Runnable task: tasks) {
            task.run();
        }
    }
}
+6
source

, Executors.newSingleThreadExecutor() , , . , , ,

executor.submit(new Runnable() {
   public void run() {
        myTask1.call();
        myTask2.call();
        myTask3.call();
    }});

, , myTask2 , myTask1 .

+2

, , - - , , ( ). , Queue (), ( ExecutorService , ), a Pipelineable (aka ) PipelineManager, . , , , , 1 , , , , , .

, n , , , , , () .

JDK ExecutorService, BlockingQueue ( , a ThreadPoolExecutor), , , " ". , , , , , singleThreadExecutor .

()

, , ExecutorService, ThreadPoolExecutor ( ), ; 1, n . - ( , , , !)

public class PipeliningExecutorService<T extends Pipelineable> implements ExecutorService {
    private Map<Key, ExecutorService> executors;
    private ExecutorService generalPurposeExecutor;

    // ExecutorService methods here, for example
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Pipelineable pipelineableTask = convertTaskToPipelineable(task);
        Key taskKey = pipelineable.getKey();
        ExecutorService delegatedService = executors.get(taskKey);
        if (delegatedService == null)  delegatedService = generalPurposeExecutor;
        return delegatedService.submit(task);
    }
}

public interface Pipelineable<K,V> {
    K getKey();
    V getValue();
}

, ExecutorService , , , - , Pipelineable fallback, ( , ).

+1

Hmm, I thought about something, not quite sure if this will work, but maybe it will (unverified code). This skips the subtleties (handling exceptions, cancellation, justice to other tasks of the main Contractor, etc.), but it may be useful.

 class SequentialExecutorWrapper implements Runnable
 {
     final private ExecutorService executor;

     // queue of tasks to execute in sequence
     final private Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

     // semaphore for pop() access to the task list
     final private AtomicBoolean taskInProcess = new AtomicBoolean(false);

     public void submit(Runnable task)
     {
         // add task to the queue, try to run it now
         taskQueue.offer(task);
         if (!tryToRunNow())
         {
             // this object is running tasks on another thread
             // do we need to try again or will the currently-running thread
             // handle it? (depends on ordering between taskQueue.offer()
             // and the tryToRunNow(), not sure if there is a problem)
         }
     }

     public void run()
     {
         tryToRunNow();
     }

     private boolean tryToRunNow()
     {
         if (taskInProcess.compareAndSet(false, true))
         {
             // yay! I own the task queue!
             try {
                 Runnable task = taskQueue.poll();
                 while (task != null)
                 {
                     task.run();
                     task = taskQueue.poll();
                 }
             }
             finally
             {
                 taskInProcess.set(false);
             }
             return true;
         }
         else
         {
             return false;
         }
     }
0
source

Source: https://habr.com/ru/post/1730444/


All Articles