Java CompletableFuture: only the first result

After reading this article https://community.oracle.com/docs/DOC-995305on an Oracle site, I’m trying to implement the template described in the section “Some two-to-one“ Selecting templates. ”This last category of templates also contains a two-to-one template. But this time, instead of executing the descending element, the two above elements are completed once, and the downstream element is executed when one of the two upstream elements is completed. This can be very useful if we want to resolve a domain name, for example. Instead of requesting only one domain name server, we could more efficiently the roughness of a group of domain name servers. We do not expect that there will be different results from different servers, so we do not need more answers than the first, that we get. All other requests can be safely undone.

It’s easy to implement a script in which I have only 2 CompleatableFuture, but I cannot implement the same script with 3 or more versions of CompleatableFuture.

I tried this:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    cf1.applyToEither(
            cf2, s1 -> cf2.applyToEither(
                    cf3, s2 -> cf3.applyToEither(
                            cf4, s3 -> "First result is: " + s3))).thenAccept(System.out::println).join();

FutureMain is my class and this is the generateString method

public static String generateString(String input) {
    Random r = new Random();
    int millis = r.nextInt(6) * 1000;
    System.out.println(input + " " + millis);
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return input + ": " + millis;
}

I successfully combined several CompleatableFuture when I want them all to be completed:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    CompletableFuture<String> cf5 = CompletableFuture.allOf(cf1, cf2, cf3, cf4).thenApply(
            s -> elaborate(cf1.join(), cf2.join(), cf3.join(), cf4.join())); 

    cf5.thenAccept(System.out::println).join();

Any suggestion?

+4
source share
2 answers

Two-to-One Pattern Selection says:

a descending element is executed when one of the two ascending elements is completed.

, , , - , , , .

java8 stream api -

//the first upstream is always blocked.
CompletableFuture<String> blocked = new CompletableFuture<>();
CompletableFuture<String> upstreams = Stream.of(cf1, cf2, cf3, cf4).reduce(blocked,
        (it, upstream) -> it.applyToEither(upstream, Function.identity()));

upstreams.thenAccept(System.out::println).join();// print "foo"

:

supplyAsync CompletableFeature . ​​

CompletableFuture<String> cf1 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf2 = supplyAsync(returnValueLater("bar"));
CompletableFuture<String> cf3 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf4 = supplyAsync(returnValue("foo"));

CompletableFuture<String> upstreams = cf1.applyToEither(cf2, Function.identity())
                                         .applyToEither(cf3, Function.identity())
                                         .applyToEither(cf4, Function.identity());

upstreams.thenAccept(System.out::println).join();// print "foo"

private <T> Supplier<T> returnValue(T value) {
    return returnValue(() -> value);
}

private <T> Supplier<T> blocked(Class<T> type) {
    return returnValue(() -> {
        Thread.currentThread().join();
        return null;
    });
}

private <T> Supplier<T> returnValueLater(T value) {
    return returnValue(() -> {
        Thread.sleep(100);
        return value;
    });
}

private <T> Supplier<T> returnValue(Callable<T> value) {
    return () -> {
        try {
            return value.call();
        } catch (Exception e) { throw new RuntimeException(e); }
    };
}

import org.junit.jupiter.api.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
import static java.util.Arrays.asList;
import static java.util.concurrent.CompletableFuture.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class CompletableFuturePatternTest {

    @Test @DisplayName("Two-to-One Selecting Pattern")
    void selectingManyToOne() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .getFirstResult();

        assertThat(user, equalTo("Joe"));
    }

    @Test @DisplayName("Two-to-One Combining Pattern")
    void combiningManyToOne() throws Throwable {
        List<String> users = select("select user from User", String.class)
                .from(availableServers())
                .list();

        assertThat(users, equalTo(asList("Bob", "Joe", "Doe")));
    }

    @Test @DisplayName("One-to-One Pattern")
    void waitUntilUpstreamCompleted() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .to(String::toUpperCase);

        assertThat(user, equalTo("JOE"));
    }

    private CompletableFuture<String>[] availableServers() {
        return new CompletableFuture[]{
                server(returnValueLater("Bob")),
                server(returnValue("Joe")),
                server(returnValueLater("Doe")),
        };
    }

    private <T> CompletableFuture<T> server(Supplier<T> supplier) {
        return supplyAsync(supplier);
    }

    private <T> Supplier<T> returnValue(T value) {
        return returnValue(() -> value);
    }


    private <T> Supplier<T> returnValueLater(T value) {
        return returnValue(() -> {
            Thread.sleep(500);
            return value;
        });
    }

    private <T> Supplier<T> returnValue(Callable<T> value) {
        return () -> {
            try {
                return value.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private <T> Query<T> select(String query, Class<T> type) {
        return new Query<T>() {
            private CompletableFuture<T>[] upstreams;

            @Override
            public Query<T> from(CompletableFuture<T>... upstreams) {
                this.upstreams = upstreams;
                return this;
            }

            @Override
            public T getFirstResult() throws Exception {
                return selecting().get();
            }

            @Override
            public <R> R to(Function<T, R> mapper) throws Exception {
                return selecting().thenApply(mapper).get();
            }

            private CompletableFuture<T> selecting() {
                return upstreams(blocked(), this::selecting);
            }

            private CompletableFuture<T> selecting(CompletableFuture<T> primary,
                                                   CompletableFuture<T> upstream) {
                return primary.applyToEitherAsync(upstream, Function.identity());
            }

            private CompletableFuture<T> blocked() {
                return new CompletableFuture<>();
            }

            @Override
            public List<T> list() throws Exception {
                return upstreams(collector(), this::combine, this::combiner).get();
            }

            private CompletableFuture<List<T>> collector() {
                return completedFuture(new ArrayList<>());
            }

            private CompletableFuture<List<T>> combine(CompletableFuture<List<T>> primary,
                                                       CompletableFuture<T> upstream) {
                return primary.thenCombineAsync(upstream, this::concat);
            }

            private List<T> concat(List<T> result, T value) {
                result.add(value);
                return result;
            }

            private CompletableFuture<List<T>> combiner(CompletableFuture<List<T>> primary
                    , CompletableFuture<List<T>> secondary) {

                return primary.thenCombineAsync(secondary, this::concat);
            }

            private <T> List<T> concat(List<T> primary, List<T> secondary) {
                primary.addAll(secondary);
                return primary;
            }

            private CompletableFuture<T> upstreams(CompletableFuture<T> identity,
                                                   BinaryOperator<CompletableFuture<T>> accumulator) {
                return upstreams(identity, accumulator, accumulator);
            }

            private <U> CompletableFuture<U> upstreams(CompletableFuture<U> identity
                    , BiFunction<CompletableFuture<U>, CompletableFuture<T>, CompletableFuture<U>> accumulator
                    , BinaryOperator<CompletableFuture<U>> combiner) {
                return Stream.of(upstreams).reduce(identity, accumulator, combiner);
            }

        };
    }

    interface Query<T> {
        Query<T> from(CompletableFuture<T>... upstreams);

        T getFirstResult() throws Exception;

        <R> R to(Function<T, R> mapper) throws Exception;

        List<T> list() throws Exception;
    }
}
+2

"--" . , allOf anyOf. , , :

CompletableFuture<String> cf1
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

CompletableFuture<String> cf5 = CompletableFuture.anyOf(cf1, cf2, cf3, cf4)
    .thenApply(String.class::cast); 

cf5.thenAccept(System.out::println).join();

.

, , , . , , , , :

public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    CompletableFuture.allOf(
        l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
    return f;
}

:

CompletableFuture<String> cf1
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 // to demonstrate that this quick failure is not prefered
    = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); });

CompletableFuture<String> cf5 = anyOf(Arrays.asList(cf1, cf2, cf3, cf4));

cf5.thenAccept(System.out::println).join();
+4

Source: https://habr.com/ru/post/1674756/


All Articles