, , . , CompletableFuture Java8. Java7 io.netty.util.concurrent.Promise .
, :
, , . , . , ( , ). future.get() ExecutionException.
, , get.
, , :
public <R> CompletableFuture<R> execute(Collection<? extends Callable<R>> jobs) {
final CompletableFuture<R> result = new CompletableFuture<>();
if (jobs == null || jobs.isEmpty()) {
result.completeExceptionally(new IllegalArgumentException("there must be at least one job"));
return result;
}
final ExecutorService service = Executors.newFixedThreadPool(jobs.size());
// accumulate all exceptions to rank later (only if all throw)
final List<RankedException> exceptions = Collections.synchronizedList(Lists.newArrayList());
final AtomicBoolean done = new AtomicBoolean(false);
for (Callable<R> job: jobs) {
service.execute(() -> {
try {
// this is where the actual work is done
R res = job.call();
// set result if still unset
if (done.compareAndSet(false, true)) {
// complete the future, move to service shutdown
result.complete(res);
}
// beware of catching Exception, change to your own checked type
} catch (Exception ex) {
if (ex instanceof RankedException) {
exceptions.add((RankedException) ex);
} else {
exceptions.add(new RankedException(ex));
}
if (exceptions.size() >= jobs.size()) {
// the last to throw and only if all have thrown will run:
Collections.sort(exceptions, (left, right) -> Integer.compare(left.rank, right.rank));
// complete the future, move to service shutdown
result.completeExceptionally(exceptions.get(0));
}
}
});
}
// shutdown also on error, do not wait for this stage
result.whenCompleteAsync((action, t) -> service.shutdownNow());
return result;
}
RankedExeption:
public static class RankedException extends Exception {
private final int rank;
public RankedException(Throwable t) {
this(0, t);
}
public RankedException(int rank, Throwable t) {
super(t);
this.rank = rank;
}
}
, ( , ):
@Rule
public ExpectedException exception = ExpectedException.none();
private static class TestJob implements Callable<Double> {
private final int index;
private final int failOnCount;
TestJob(int index, int failOnCount) {
this.index = index;
this.failOnCount = failOnCount;
}
@Override
public Double call() throws RankedException {
double res = 0;
int count = (int) (Math.random() * 1e6) + 1;
if (count > failOnCount) {
throw new RankedException(count, new RuntimeException("job " + index + " failed"));
}
for (int i = 0; i < count; i++) {
res += Math.random();
}
return res;
}
}
@Test
public void test_success() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, (int)(5*1e5)));
}
CompletableFuture<Double> res = execute(jobs);
logger.info("SUCCESS-TEST completed with " + res.get(30, TimeUnit.SECONDS));
}
@Test
public void test_failure() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, 0));
}
CompletableFuture<Double> res = execute(jobs);
exception.expect(ExecutionException.class);
try {
res.get(30, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
logger.severe(String.format("FAIL-TEST rank: %s", ((RankedException) ex.getCause()).rank));
throw ex;
}
}
, :
: 115863.20802680103
SEVERE: FAIL-TEST rank: 388150
0
: AtomicBoolean, , .
, , , . .