Quick n dirty
Is it possible to reorganize a traditional event listener into a Java 8 stream so that listener events become a stream source?
Long story
The client sends an arbitrary task, and then searches for results:
Client client = new JobClient() client.addTaskListener(this) client.submitJobAsync( new MultiTaskJob()) //returns void, important (see below) public void onTaskResult(TaskResult result){ if(result.isLastResult()) aggregateJobResults(result) else processResult(result) }
Problem
For any job submitted, the client receives n results, but does not know how many results he will receive (he uses isLastResult() to determine when to stop and fill).
goal
I want to reorganize the listener into a “provider” or something similar, so that onTaskResult() source of the stream:
Supplier<TaskResult> taskResultSupplier = () -> Stream.of( .. ) //onTaskResult() feeds this .map(result -> { if(result.isLastResult()) //logic here });
Something like; if I can do this without the client knowing how many results to expect, I'm golden; right now, submitJobAsync() returns void, I would like to save it that way, but I am also open to parameters ...
Alternatives
After reading Tomash Nurkevich on CompletionFutures for an analogous scenario, there is an alternative option, suggesting minor changes for the client:
List<CompletableFuture<TaskResult>> taskFutures = client.submitJobAsync( new MultiTaskJob())
Here the client gets the list of CompletableFutures<TaskResult> , so we need to collect the results of the futures when they are completed:
//processes all task result futures List<TaskResult> = taskFutures.stream() .map(taskResult -> taskResult.thenApply(this::processResult)) .collect(Collectors.<TaskResult>toList());
The article also shows the use of CompletableFuture.allOf(..) to perform the final processing, but only after all the futures are completed (this is pretty smooth); that when in my case aggregation happens. There is no code for this here, although the article did a great job explaining this (I'm full n00b with threads, although if I get it I will send the code: -D)