CompletedFuture: call the void function asynchronusly

I am trying to implement a database query with a re-strategy for some database exceptions. The code for the repetition strategy is not very appropriate, so I did not include it. As you can see in the code below, I wrote retryCallable, which accepts a replay strategy and Callable in populateData() .

In getDataFromDB I get data from the database and put the data in a global hash file, which serves as a cache at the application level.

This code works as expected. I would like to call populateData from another class. However, this will be a blocking call. Since this is a database and retry strategy, this can be slow. I want to call populateData asynchronously.

How can I use CompletableFuture or FutureTask to achieve this? CompletableFuture.runAsync awaiting execution. CompletableFuture.supplyAsync awaiting a provider. I have not implemented this before. Therefore, any best practice tips will be helpful.

 Class TestCallableRetry { public void populateData() { final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB()); Set<String> data = new HashSet<>(); data = retryCallable.call(); if (data != null && !data.isEmpty()) { // store data in a global hash map } } private Callable<Set<Building>> getDataFromDB() { return new Callable<Set<String>>() { @Override public Set<String> call() { // returns data from database } }; } } Class InvokeCallableAsynchronously { public void putDataInGlobalMap { // call populateData asynchronously } } 
+5
source share
2 answers

If you split your populateData method into two parts, one Supplier to retrieve the data, and the other Consumer to save it, it will be easy to associate them with CompletableFuture .

 // Signature compatible with Supplier<Set<String>> private Set<String> fetchDataWithRetry() { final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB()); try { return retryCallable.call(); } catch (Exception e) { log.error("Call to database failed", e); return Collections.emptySet(); } } // Signature compatible with Consumer<Set<String>> private void storeData(Set<String> data) { if (!data.isEmpty()) { // store data in a global hash map } } 

Then in populateData() :

 private ExecutorService executor = Executors.newCachedThreadPool(); public void populateData() { CompletableFuture .supplyAsync(this::fetchDataWithRetry, executor) .thenAccept(this::storeData); } 

Using a version of supplyAsync that accepts Executor is optional. If you use a single version of arg, your task will work in a shared pool; OK for short tasks, but not for tasks that are blocked.

+1
source

There are various utility methods that you combine within the CompletableFuture , and it is really worth exploring all of them.

Let's start with the populateData method. By his name you can deduce that he must receive a data stream from somewhere.

His signature may look like this:

 void populateData ( Supplier<? extends Collection<Building> dataSupplier ); 

Supplier , as the name implies, is simply what provides us with some data.

getDataFromDB() seems appropriate as a Supplier role.

 private Set<Building> getDataFromDB() // supply a building collection 

We want populateData execute asynchronously and return the result of whether the operation completed correctly or not.

So, in the future, populateData may come back and tell us how it went.

Convert the signature to:

 CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier); 

Now let's see what the body of the method looks like:

 CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier) { return CompletableFuture // create new completable future from factory method .supplyAsync(supplier) // execute the supplier method (getDataFromDB() in our case) .thenApplyAsync(data -> { // here we can work on the data supplied if (data == null || data.isEmpty()) return new Result(false); // some heavy operations for (Building building : data) { // do something } return new Result(true); // return dummy positive result data }) .handleAsync((result, throwable) -> { // check if there was any exception if (throwable != null) { // check if exception was thrown Log.log(throwable); return new Result(false); } return result; }); } 

Now we can call populateData from somewhere and apply another callback to execute when it has finished executing asynchronously.

 populateData(TestCallableRetry::getDataFromDB).thenAccept( result -> { if ( ! result.success ) { // things went bad... retry ?? } }); 

Now it depends on how you want to apply the reuse strategy. . If you just need to retry once, you can just call populateData second time in thenAcceptAsync .

You should also catch exceptions in your provider and convert them to java.util.concurrent.CompletionException , as they are handled smoothly within the CompletableFuture .

+2
source

Source: https://habr.com/ru/post/1275853/


All Articles