In java, how do I handle CompletableFutures and get the first desired result that completes?

Usually with CompletableFuture I would call thenApply or some other method to do something as soon as the result is available. However, I now have a situation where I want to process the results until I get a positive result, and then ignore all further results.

If I just wanted to get the first available result, I could use CompletableFuture.anyOf (although I don't like converting the list to an array just for calling anyOf). But that is not what I want. I want to get the first result, and if it does not have the desired result, I want to process the second available result and so on until I get the desired result.

Here is a simple example that goes through all the results and returns the first value it finds that is greater than 9. (Note that this is not my real task. This is a simple example.)

public Integer findFirstGt9(List<CompletableFuture<Integer>> results) { for(CompletableFuture<Integer> result : results) { Integer v = result.get(); if(v > 9) return v; } return null; } 

Of course, this example goes through the results from the very beginning, and not by searching for the results as they are completed. So here is what accomplishes what I want, but with much more complex code.

 public Integer findFirstGt9(List<CompletableFuture<Integer>> results) { AtomicInteger finalResult = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(results.size()); for(CompletableFuture<Integer> result : results) { result.whenComplete((v,e) -> { if(e!=null) { Logger.getLogger(getClass()).error("",e); } else if(v > 9) { finalResult.set(v); while(latch.getCount() > 0) latch.countDown(); return; } latch.countDown(); }); } latch.await(); if(finalResult.get() > 9) return finalResult.get(); return null; } 

Is there any api where I can do this?

 public Integer findFirstGt9(List<CompletableFuture<Integer>> results) { Iterator<Integer> resultIt = getResultsAsAvailable(results); for(; resultIt.hasNext();) { Integer v = resultIt.next(); if(v > 9) return v; } return null; } 

Or even better:

 public Integer findFirstGt9(List<CompletableFuture<Integer>> results) { return getFirstMatch(results, r -> {return r > 9;}); } 
+5
source share
2 answers

You can use the following solution:

 public static <T> CompletableFuture<T> anyMatch( List<? extends CompletionStage<? extends T>> l, Predicate<? super T> criteria) { CompletableFuture<T> result=new CompletableFuture<>(); Consumer<T> whenMatching=v -> { if(criteria.test(v)) result.complete(v); }; CompletableFuture.allOf(l.stream() .map(f -> f.thenAccept(whenMatching)).toArray(CompletableFuture<?>[]::new)) .whenComplete((ignored, t) -> result.completeExceptionally(t!=null? t: new NoSuchElementException())); return result; } 

The basic principle is the same as in the Answers to columns , however there are some differences:

  • The general signature is more flexible.
  • The creation of the array needed for CompletableFuture.allOf is combined with the registration of the subsequent action on the source futures. As a side effect, the allOf action allOf depends on the completion of all attempts to complete the result, and not just on the original futures. This makes the actual desired dependency explicit. That way, it will work even if you replace all thenAccept with thenAcceptAsync s.
  • This decision ends with a NoSuchElementException , and not returning null if the result does not meet the criteria. If at least one future is completed in exceptional cases and a successful completion with the result of the match is not performed, one of the exceptions occurred.

You can try with

 List<CompletableFuture<Integer>> list=Arrays.asList( CompletableFuture.supplyAsync(()->5), CompletableFuture.supplyAsync(()->{throw new RuntimeException(); }), CompletableFuture.supplyAsync(()->42), CompletableFuture.completedFuture(0) ); anyMatch(list, i -> i>9) .thenAccept(i->System.out.println("got "+i)) // optionally chain with: .whenComplete((x,t)->{ if(t!=null) t.printStackTrace(); }); 
+2
source

I don't know any such API in the JDK or elsewhere. You can roll on your own.

You can take advantage of the fact that CompletableFuture#complete (and completeExceptionally ) does nothing if the future is already completed.

If this is not yet completed, sets the value returned by get() and the methods associated with this value.

Create a new final result, CompletableFuture . Add a sequel to each of your futures that tries to complete to get this final result, if your condition applies. This future will end with a first success. However, if that fails, you probably need null . You can create a CompletableFuture with allOf to also complete final result with null .

Sort of

 public static <T> CompletableFuture<T> firstOrNull(List<CompletableFuture<T>> futures, Predicate<T> condition) { CompletableFuture<T> finalResult = new CompletableFuture<>(); // attempt to complete on success futures.stream().forEach(future -> future.thenAccept(successResult -> { if (condition.test(successResult)) finalResult.complete(successResult); })); CompletableFuture<?> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); all.thenRun(() -> { finalResult.complete(null); }); return finalResult; } 

You pay the overhead of no-op calls.

You can change null to some default value or handle exceptions differently ( completeExceptionally , as soon as an error occurs). You will need to use whenComplete or handle instead of thenAccept above to access the Exception .

+3
source

Source: https://habr.com/ru/post/1247089/


All Articles