Java: Design Ideas to Stop a Called Topic

I am writing a program that does some batch processing. Batch items can be processed independently of each other, and we want to minimize the total processing time. Thus, instead of iterating over each element in the package one at a time, I use the ExecutorService and pass Callable objects to it:

public void process(Batch batch) { ExecutorService execService = Executors.newCachedThreadPool(); CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>(); for (BatchElement element : batch.getElement()) { Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(), element)); futures.add(future); } boolean done = false; while (!done) { for (Future<BatchElementStatus> future : futures) { try { if (future.isDone()) { futures.remove(future); } } catch (Exception e) { System.out.println(e.getMessage()); } if (futures.size() == 0) { done = true; } } } } 

We want to be able to enable batch processing. Since I do not use a loop, I cannot just check at the top of each loop if the cancel flag is set.

We are using a JMS theme with which BatchProcessor and ElementProcessor will listen to report that the package has been canceled.

There are several steps to the ElementProcess () call, after which some of them can be safely stopped, but there is no return point. The class has this basic design:

 public class ElementProcessor implements Callable, MessageListener { private cancelled = false; public void onMessage(Message msg) { // get message object cancelled = true; } public BatchElementStatus call() { String status = SUCCESS; if (!cancelled) { doSomehingOne(); } else { doRollback(); status = CANCELLED; } if (!cancelled) { doSomehingTwo(); } else { doRollback(); status = CANCELLED; } if (!cancelled) { doSomehingThree(); } else { doRollback(); status = CANCELLED; } if (!cancelled) { doSomehingFour(); } else { doRollback(); status = CANCELLED; } // After this point, we cannot cancel or pause the processing doSomehingFive(); doSomehingSix(); return new BatchElementStatus("SUCCESS"); } } 

I am wondering if there is a better way to check if a package / element has been canceled, except for calls to the lock method / code blocks in the call method in if(!cancelled) .

Any suggestions?

+4
source share
3 answers

I do not think that you can do much better than what you are doing now, but here is an alternative:

 public BatchElementStatus call() { return callMethod(1); } private callMethod(int methodCounter) { if (cancelled) { doRollback(); return new BatchElementStatus("FAIL"); } switch (methodCounter) { case 1 : doSomethingOne(); break; case 2 : doSomethingTwo(); break; ... case 5 : doSomethingFive(); doSomethingSix(); return new BatchElementStatus("SUCCESS"); } return callMethod(methodCounter + 1); } 

In addition, you want to do cancelled volatile, since onMessage will be called from another thread. But you probably don't want to use onMessage and cancelled anyway (see below).

Other minor points: 1) CopyOnWriteArrayList<Future<BatchElementStatus>> futures should just be an ArrayList . Using parallel collection makes us think that futures is in many threads. 2) while (!done) should be replaced by while (!futures.isEmpty()) and done deleted. 3) You should probably just call future.cancel(true) instead of canceling the messaging. Then you will need to check if (Thread.interrupted()) instead of if (cancelled) . If you want to kill all futures, just call execService.shutdownNow() ; your tasks must handle interrupts for this to work.

EDIT

instead of while(!done) { for (... futures) { ... }} , you should use ExecutorCompletionService . It does what you are trying to do, and it is probably much better. The API has a complete example.

+1
source

The future has a cancel (boolean) method that will interrupt the current thread if true is passed to

replace if(!cancelled) checks with if(Thread.interrupted()) and return when you received the interrupt (you are not currently)

note that this will reset the interrupted flag to false (so if(Thread.interrupted()&&Thread.interrupted()) will be false) if you do not want reset to use Thread.currentThread().isInterrupted() , it supports flag for subsequent checks

or you can reset move the flag to Thread.currentThread().interrupt();

also to use this while waiting for

 for(Iterator<Future<MtaMigrationStatus>> it = futures.iterator();it.hasNext();){ Future<MtaMigrationStatus> future = it.next(); try { if (future.isDone()) { it.remove();//<<--this avoids concurrent modification exception in the loop } } catch (Exception e) { System.out.println(e.getMessage()); } } if (futures.size() == 0)//outside the inner for loop and inside the while (or make the condition this) for micro-optimizing this check { done = true; } 
0
source

Your ElementProcessor can extend from java.util.concurrent.FutureTask , which

Failsafe asynchronous computing. This class provides a base for the implementation of the "Future", using the methods of starting and canceling the calculation, a request to check whether the calculation is completed, and get the result of the calculation.

The FutureTask class is a Future implementation that implements Runnable, and therefore can be executed by the Contractor.

FutureTask has a cancel method that can be implemented to perform some undo operations on certain operations. In addition, if FutureTask canceled, it will no longer be executed, so you do not have to always check the status.

0
source

Source: https://habr.com/ru/post/1369226/


All Articles