I need to run N slow calculations (where N is a fairly large number) and would like to do this on M threads, because slow calculations have a lot of I / O latency. I have put together a small example that works well for the case where all the calculations are successful. However, if the calculation fails, the desired behavior should stop processing further calculations. Each successful calculation has already written the result to the database, so I just need to determine which calculation failed, and stop the calculations that have not yet been started.
My approach is to use the ExecutorService interface for Executors.newFixedThreadPool. However, I don’t see a clean way to determine if one of the failed calculations (in my example returns false) and stop the calculations that were sent to the ExecutorService but not yet assigned threads from the pool.
Is there a clean way to do this? Is there a better approach for me?
import java.util.*;
import java.util.concurrent.*;
class Future
{
static private class MyWorker implements Callable
{
private Integer item;
public MyWorker(Integer item)
{
this.item = item;
}
public Boolean call() throws InterruptedException
{
if (item == 42)
{
return false;
}
else
{
System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
Thread.sleep(1000);
return true;
}
}
}
static int NTHREADS = 2;
public static void main(String args[])
{
Queue<Integer> numbers = new LinkedList<Integer>();
for (int i=1; i<10000; i++)
{
numbers.add(i);
}
System.out.println("Starting thread test.");
ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);
for (Integer i : numbers)
{
MyWorker my = new MyWorker(i);
System.out.println("Submit..." + i.toString());
exec.submit(my);
System.out.println("... Done Submit");
}
exec.shutdown();
System.out.println("Exiting thread test.");
}
}
EDIT: Here's a working implementation of the afk proposal . Still plan to take a look at the callback solution and hope for other suggestions.
import java.util.*;
import java.util.concurrent.*;
class MyFuture
{
static private class MyWorker implements Callable
{
private Integer item;
public MyWorker(Integer item)
{
this.item = item;
}
public Boolean call()
{
if (item == 42)
{
return false;
}
else
{
System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
try
{
Thread.sleep(1000);
}
catch (InterruptedException ie)
{
}
return true;
}
}
}
static int NTHREADS = 4;
public static void main(String args[]) throws InterruptedException
{
Queue<Integer> numbers = new LinkedList<Integer>();
for (int i=1; i<100; i++)
{
numbers.add(i);
}
System.out.println("Starting thread test.");
ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);
List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
for (Integer i : numbers)
{
MyWorker my = new MyWorker(i);
System.out.println("Submit..." + i.toString());
Future<Boolean> f = exec.submit(my);
futures.add(f);
System.out.println("... Done Submit");
}
boolean done = false;
while (!done)
{
Iterator<Future<Boolean>> it = futures.iterator();
while (it.hasNext())
{
Future<Boolean> f = it.next();
if (f.isDone())
{
try
{
System.out.println("CHECK RETURN VALUE");
if (f.get())
{
it.remove();
}
else
{
System.out.println("IMMEDIATE SHUTDOWN");
exec.shutdownNow();
done = true;
break;
}
}
catch (InterruptedException ie)
{
}
catch (ExecutionException ee)
{
}
}
}
Thread.sleep(1000);
if (futures.size() == 0)
{
done = true;
}
}
exec.shutdown();
System.out.println("Exiting thread test.");
}
}
source
share