Using an Event Listener as a Java 8 Stream Source

Quick n dirty

Is it possible to reorganize a traditional event listener into a Java 8 stream so that listener events become a stream source?

Long story

The client sends an arbitrary task, and then searches for results:

Client client = new JobClient() client.addTaskListener(this) client.submitJobAsync( new MultiTaskJob()) //returns void, important (see below) public void onTaskResult(TaskResult result){ if(result.isLastResult()) aggregateJobResults(result) else processResult(result) } 

Problem

For any job submitted, the client receives n results, but does not know how many results he will receive (he uses isLastResult() to determine when to stop and fill).

goal

I want to reorganize the listener into a “provider” or something similar, so that onTaskResult() source of the stream:

 Supplier<TaskResult> taskResultSupplier = () -> Stream.of( .. ) //onTaskResult() feeds this .map(result -> { if(result.isLastResult()) //logic here }); 

Something like; if I can do this without the client knowing how many results to expect, I'm golden; right now, submitJobAsync() returns void, I would like to save it that way, but I am also open to parameters ...

Alternatives

After reading Tomash Nurkevich on CompletionFutures for an analogous scenario, there is an alternative option, suggesting minor changes for the client:

 List<CompletableFuture<TaskResult>> taskFutures = client.submitJobAsync( new MultiTaskJob()) 

Here the client gets the list of CompletableFutures<TaskResult> , so we need to collect the results of the futures when they are completed:

 //processes all task result futures List<TaskResult> = taskFutures.stream() .map(taskResult -> taskResult.thenApply(this::processResult)) .collect(Collectors.<TaskResult>toList()); 

The article also shows the use of CompletableFuture.allOf(..) to perform the final processing, but only after all the futures are completed (this is pretty smooth); that when in my case aggregation happens. There is no code for this here, although the article did a great job explaining this (I'm full n00b with threads, although if I get it I will send the code: -D)

+6
source share
2 answers

You can create a thread around your TaskResults. See this example:

 import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * Created for http://stackoverflow.com/q/27670421/1266906. */ public class AsyncToStream { public static void main(String[] args) { System.out.println("Unbuffered Test:"); AsyncTaskResultIterator<TaskResult> taskListener1 = new AsyncTaskResultIterator<>(); new TaskResultGenerator(taskListener1, 5).start(); taskListener1.unbufferedStream().forEach(System.out::println); System.out.println("Buffered Test:"); AsyncTaskResultIterator<TaskResult> taskListener2 = new AsyncTaskResultIterator<>(); new TaskResultGenerator(taskListener2, 5).start(); taskListener2.bufferedStream().forEach(System.out::println); } /** * This class wraps a sequence of TaskResults into an iterator upto the first TaskResult where {@code }isLastResult()} returns {@code true} */ public static class AsyncTaskResultIterator<T extends TaskResult> implements Iterator<T>, TaskListener<T> { /** * This acts as an asynchronous buffer so we can easily wait for the next TaskResult */ private final BlockingQueue<T> blockingQueue; /** * Becomes {@code true} once {@code TaskResult.isLastResult()} is received */ private boolean ended; public AsyncTaskResultIterator() { blockingQueue = new LinkedBlockingQueue<>(); } /** * Waits on a new TaskResult and returns it as long as the previous TaskResult did not specify {@code isLastResult()}. Afterwards no more elements can be retrieved. */ @Override public T next() { if (ended) { throw new NoSuchElementException(); } else { try { T next = blockingQueue.take(); ended = next.isLastResult(); return next; } catch (InterruptedException e) { throw new IllegalStateException("Could not retrieve next value", e); } } } @Override public boolean hasNext() { return !ended; } /** * Enqueue another TaskResult for retrieval */ @Override public void onTaskResult(T result) { if (ended) { throw new IllegalStateException("Already received a TaskResult with isLastResult() == true"); } try { blockingQueue.put(result); } catch (InterruptedException e) { throw new IllegalStateException("Could not enqueue next value", e); } } /** * Builds a Stream that acts upon the results just when they become available */ public Stream<T> unbufferedStream() { Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(this, 0); return StreamSupport.stream(spliterator, false); } /** * Buffers all results and builds a Stream around the results */ public Stream<T> bufferedStream() { Stream.Builder<T> builder = Stream.builder(); this.forEachRemaining(builder); return builder.build(); } } public static class TaskResultImpl implements TaskResult { private boolean lastResult; private String name; public TaskResultImpl(boolean lastResult, String name) { this.lastResult = lastResult; this.name = name; } @Override public String toString() { return "TaskResultImpl{" + "lastResult=" + lastResult + ", name='" + name + '\'' + '}'; } @Override public boolean isLastResult() { return lastResult; } } public static interface TaskListener<T extends TaskResult> { public void onTaskResult(T result); } public static interface TaskResult { boolean isLastResult(); } private static class TaskResultGenerator extends Thread { private final TaskListener<TaskResult> taskListener; private final int count; public TaskResultGenerator(TaskListener<TaskResult> taskListener, int count) { this.taskListener = taskListener; this.count = count; } @Override public void run() { try { for (int i = 1; i < count; i++) { Thread.sleep(200); taskListener.onTaskResult(new TaskResultImpl(false, String.valueOf(i))); } Thread.sleep(200); taskListener.onTaskResult(new TaskResultImpl(true, String.valueOf(count))); } catch (InterruptedException e) { e.printStackTrace(); } } } } 

You did not submit your definitions for TaskResult and TaskListener, so I made my own. AsyncTaskResultIterator will work only for single TaskResult sequences. If TaskResult with isLastResult() == true not supplied next() , and therefore also the unbuffered stream Stream and the buffered stream generation will wait forever.

+1
source
+1
source

Source: https://habr.com/ru/post/980211/


All Articles