Using Spring 4.0 New ListenableFuture with Callbacks - Odd Results

I have a web application that accepts an array of identifiers, requests an external web service for each identifier one at a time, and publishes each result as it arrives at the WebSocket client through the STOMP broker. I can get this to work using simple futures, but I'm trying to use Spring 4's new ListenableFutures and provide a callback.

The working code uses the ThreadPoolTaskExecutor, which is defined in my root configuration. I have a class called "SosQuery" with a method called "test" that annotates with @Async and returns AsyncResult. Here is my working code called from the root context service class:

@Override
    public void test(String[] oids) throws Exception {
        List<Future<String>> futures = new ArrayList<Future<String>>();

        for (String oid : oids) {
            futures.add(sosQuery.test(oid));
        }

        while (!futures.isEmpty()) {
            List<Future<String>> done = new ArrayList<Future<String>>();
            for (Future<String> future : futures) {
                if (future.isDone()) {
                    messagingTemplate.convertAndSendToUser("me", "/queue/observation", future.get());
                    done.add(future);
                }
            }
            futures.removeAll(done);
        }
    }

, , . SosQuery, @Async, "String" SimpleAsyncTaskExecutor . ListenableFuture:

 @Override
    public void test(String[] oids) throws Exception {
        for (final String oid : oids) {
              ListenableFuture<String> task = asyncTaskExecutor.submitListenable(new Callable<String>(){
                @Override
                public String call() throws Exception {
                    String result = sosQuery.test(oid);
                    logger.debug("result for sosQuery: " + result);
                    return result;
                }
            });

            task.addCallback(new ListenableFutureCallback<String>() {

                @Override
                public void onSuccess(String result){
                    if (result == null){
                        result = "ITS NULL";
                    }
                    messagingTemplate.convertAndSendToUser("me", "/queue/observation", result);
                }

                @Override
                public void onFailure(Throwable t){
                    logger.error("Error executing callback.", t);
                }
            });
        }
    }

... , , call() , SosQuery , logger . onSuccess, String NULL.

onFailure , . ListableFutures AsyncRestTemplate, . - , ?

+4
1

@Async SosQuery.test.

ListenableFuture<String> task = asyncTaskExecutor.submitListenable(new Callable<String>(){
                @Override
                public String call() throws Exception {
                    String result = sosQuery.test(oid);
                    logger.debug("result for sosQuery: " + result);
                    return result;
                }
            });

call() . @Async . ( ).

Doc **

    This implementation does not reuse threads! Consider a thread-pooling TaskExecutor 
implementation instead, in particular for executing a large number of short-lived tasks.
+4

Source: https://habr.com/ru/post/1540724/


All Articles