Java task statement that returns the first completion task

I want to implement something to get a collection of workers (callables), run it in parallel in threadpool, and when the fastest worker returns the result gracefully completed (ExecutorService.shutdownNow) to other employees so that they no longer spend resources. If all employees end with exceptions, I need to reconstruct the most important one (all exceptions thrown by employees are related to the value importance). I also need to have a timeout for the entire artist, who will complete the work of all employees if they work too long.

I was thinking about using RxJava for this, because it seems like a short and nice solution that could be reached here. But maybe you can come up with some better tools for this (CompletingFutures, ForkJoinTasks?). Here is the code that I already wrote, but it is not a working solution (I am not very versed in reactive programming and, therefore, really struggle with this):

public T run(Collection<? extends Worker<T>> workers, long timeout) {
    ExecutorService executorService = Executors.newFixedThreadPool(workers.size());
    return Observable.from(workers)
            .timeout(timeout, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.from(executorService))
            .map(worker -> {
                try {
                    T res = worker.call();
                    executorService.shutdownNow();
                    return res;
                } catch (Exception e) {
                    throw Exceptions.propagate(e);
                }
            }).doOnError(Exceptions::propagate).toBlocking().first();

I would be grateful for any help with this.

+4
source share
3 answers

, , . , CompletableFuture Java8. Java7 io.netty.util.concurrent.Promise .

, :

  • , ( , )
  • -

, , . , . , ( , ). future.get() ExecutionException.

, , get.

, , :

public <R> CompletableFuture<R> execute(Collection<? extends Callable<R>> jobs) {
  final CompletableFuture<R> result = new CompletableFuture<>();
  if (jobs == null || jobs.isEmpty()) {
    result.completeExceptionally(new IllegalArgumentException("there must be at least one job"));
    return result;
  }
  final ExecutorService service = Executors.newFixedThreadPool(jobs.size());

  // accumulate all exceptions to rank later (only if all throw)
  final List<RankedException> exceptions = Collections.synchronizedList(Lists.newArrayList());
  final AtomicBoolean done = new AtomicBoolean(false);

  for (Callable<R> job: jobs) {
    service.execute(() -> {
      try {
        // this is where the actual work is done
        R res = job.call();
        // set result if still unset
        if (done.compareAndSet(false, true)) {
          // complete the future, move to service shutdown
          result.complete(res);
        }
      // beware of catching Exception, change to your own checked type
      } catch (Exception ex) {
        if (ex instanceof RankedException) {
          exceptions.add((RankedException) ex);
        } else {
          exceptions.add(new RankedException(ex));
        }
        if (exceptions.size() >= jobs.size()) {
          // the last to throw and only if all have thrown will run:
          Collections.sort(exceptions, (left, right) -> Integer.compare(left.rank, right.rank));
          // complete the future, move to service shutdown
          result.completeExceptionally(exceptions.get(0));
        }
      }
    });
  }
  // shutdown also on error, do not wait for this stage
  result.whenCompleteAsync((action, t) -> service.shutdownNow());
  return result;
}

RankedExeption:

public static class RankedException extends Exception {
  private final int rank;

  public RankedException(Throwable t) {
    this(0, t);
  }

  public RankedException(int rank, Throwable t) {
    super(t);
    this.rank = rank;
  }
}

, ( , ):

@Rule
public ExpectedException exception = ExpectedException.none();

private static class TestJob implements Callable<Double> {
  private final int index;
  private final int failOnCount;

  TestJob(int index, int failOnCount) {
    this.index = index;
    this.failOnCount = failOnCount;
  }

  @Override
  public Double call() throws RankedException {
    double res = 0;
    int count = (int) (Math.random() * 1e6) + 1;
    if (count > failOnCount) {
      throw new RankedException(count, new RuntimeException("job " + index + " failed"));
    }
    for (int i = 0; i < count; i++) {
      res += Math.random();
    }
    return res;
  }
}

@Test
public void test_success() throws Exception {
  List<TestJob> jobs = Lists.newArrayList();
  for (int i = 0; i < 10; i++) {
    jobs.add(new TestJob(i, (int)(5*1e5))); // 50% should be alright
  }
  CompletableFuture<Double> res = execute(jobs);
  logger.info("SUCCESS-TEST completed with " + res.get(30, TimeUnit.SECONDS));
}

@Test
public void test_failure() throws Exception {
  List<TestJob> jobs = Lists.newArrayList();
  for (int i = 0; i < 10; i++) {
    jobs.add(new TestJob(i, 0)); // all should fail
  }
  CompletableFuture<Double> res = execute(jobs);
  exception.expect(ExecutionException.class);
  try {
    res.get(30, TimeUnit.SECONDS);
  } catch (ExecutionException ex) {
    logger.severe(String.format("FAIL-TEST rank: %s", ((RankedException) ex.getCause()).rank));
    throw ex;
  }
}

, :

: 115863.20802680103

SEVERE: FAIL-TEST rank: 388150

0

: AtomicBoolean, , .

, , , . .

+2

RxJava. flatMap subscribeOn flatMap. , materialize , , takeUntil. timeout .

ExecutorService executorService =
    Executors.newFixedThreadPool(workers.size());
Scheduler scheduler = Schedulers.from(executorService);
return Observable
    .from(workers)
    .flatMap(worker -> 
         Observable.fromCallable(worker)
             .subscribeOn(scheduler)
             .materialize())
    .takeUntil(notification -> notification.hasValue())
    .toList() 
    .timeout(30, TimeUnit.SECONDS)
    .flatMap(
        list -> {
            Notification<T> last = list.get(list.size() - 1);
            if (last.hasValue()) 
                return Observable.just(last.getValue());
            else {
                // TODO get the error notification from the list 
                // with the highest importance and emit
                return Observable.<T>error(err);
            }
        }).subscribe(subscriber);
+2

Have you looked at the RxJava AMB statement ? However, you need to make sure that it is completed on the first onComplete, as the documentation says nothing about it.

0
source

Source: https://habr.com/ru/post/1651905/


All Articles