How to keep thread executing while asynchronous callback callback

I have the script depicted in the diagram below

enter image description here

Here the main thread ismy java application.it opens one WM thread for execution. Perform the WM processing task. He needs to call the task number to complete. Suppose that it contains the task T1, T2, T3

T3 depends on T2, and T2 depends on T1. WM first call RM to complete task T1. T1 can respond by paging or after completing T1.

When asked how I can wait for T1 to complete, and then run T2 execute.and, how can I notify WM when T1 partially completes the sending of paging data.

This is a simple scenario, but in the case of T1, T2, T3, T4. T3 depends on T1 and T2.

Code:

public class TestAsync implements TaskCallBack { public static ExecutorService exService = Executors.newFixedThreadPool(5); public static void main(String args[]) throws InterruptedException, ExecutionException{ Task t1 = new Task(); t1.doTask(new TestAsync()); } public static ExecutorService getPool(){ return exService; } @Override public void taskCompleted(String obj) { System.out.println(obj); } } class Task { public void doTask(TaskCallBack tcb) throws InterruptedException, ExecutionException{ FutureTask<String> ft = new FutureTask<>(new Task1()); TestAsync.getPool().execute(ft); tcb.taskCompleted(ft.get()); } } class Task1 implements Callable<String>{ @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName()); return "done"; } interface TaskCallBack{ public void TaskCompleted(String obj); } } 
+6
source share
2 answers

This is a pretty interesting topic. I ran into a similar problem developing a highly parallel batch processing solution. I will share my conclusions, but before that I have to say that it is always a bad idea to use some special solution for any parallel system.

Debugging, optimization, and further development can be a nightmare without proper architecture support. Say we have three dependent tasks:

enter image description here

First decision

will represent an abstraction of a composite or compound task to allow dependent tasks to run in the correct order and get rid of delays, wait / locks, complex task management, etc.

enter image description here

I will use simplified code to illustrate this approach:

 /** * Defines a high-level task contract. * Let pretend it is enough to have it this simple. */ interface Task extends Runnable { } /** * Defines a simple way to group dependent tasks. * * Please note, this is the simplest way to make sure dependent tasks will be * executed in a specified order without any additional overhead. */ class CompoundTasks implements Task { private List<Task> tasks = ...; public void add(Task task) { tasks.add(task); } @Override public void run() { for(Task t : tasks) { t.run(); } } } 

Second solution

will set tasks with explicit dependencies and inform the performers about it. In principle, the rule is quite simple - if the task has unresolved dependencies, it should be postponed. This approach can be easily implemented and works very well.

enter image description here

Please note that the second solution will present a tiny performance limitation due to the fact that some resources will be needed to check tasks, manage queues, etc.

Let me develop our task-based approach:

 /** * Defines yet another abstraction to make dependencies * visible and properly tracked. * */ abstract class DependentTask implements Task { private List<DependentTask> dependencies = ...; public void addDependency(DependentTask task) { dependencies.add(task); } /** * Verifies task can be processed. */ public boolean hasDependenciesResolved() { boolean result = true; for(DependentTask t : dependencies) { if(!t.hasDependenciesResolved()) { result = false; break; } } return result; } @Override public abstract void run(); } /** * Implements a very basic queue aware of task dependencies. * * Basically, the idea is just to avoid any blocking state. If task can't * be processed (because of unresolved dependencies) it should be * postponed and verified later. */ class TaskQueue<T extends DependentTask> implements Runnable { private Queue<T> queue = ...; @Override public void run() { while(true) { if(!queue.isEmpty()) { T task = queue.poll(); // Verify all dependencies have been resolved. if(task.hasDependenciesResolved()) { task.run(); // process task if there is no unresolved // dependencies }else{ queue.add(task); // return task to the queue } }else{ // sleep for some reasonable amount of time } } } } 

Both approaches are easily tracked, so you can always understand what is happening.

+7
source

If you know the number of tasks for T1 and T2, you can use CountDownLatch . It will be best to wait until the T2 tasks are added to the executor until all T1 tasks are completed (the same for T2 β†’ T3).

If you cannot change the add-on code, you can also do each T2 task until all T1 tasks are completed, but this will lead to persistence problems (an executor pool filled with nothing but sleeping threads) if tasks are added disordered.

0
source

Source: https://habr.com/ru/post/982128/


All Articles