I need to implement an Iterator interface (as defined by the Java API), with hasNext () and next () methods, which should return result elements that come from an asynchronously processed HTTP response (handled by Akka actors).
The following requirements must be met:
- Do not block and wait for the async operation to complete, since generating a large set of results may take some time (the iterator should return the elements of the result as soon as they become available).
- Iterator.next () should block until the next item is available (or throw an exception if there are no more items)
- Iterator.hasNext () should return true as long as more elements remain (even if the next is not yet available)
- the total number of results is not known in advance. Agents producing the result will send a specific “final message” when it is completed.
- try to avoid using InterruptedException, for example. when the iterator expects an empty queue but no more items are created.
I haven't watched Java 8 threads or Akka threads yet. But since I basically have to iterate over the queue (the final thread), I doubt that there is any suitable solution.
Currently, my Scala stub uses java.util.concurrent.BlockingQueue and looks like this:
class ResultStreamIterator extends Iterator[Result] { val resultQueue = new ArrayBlockingQueue[Option[Result]](100) def hasNext(): Boolean = ??? // return true if not done yet def next(): Result = ??? // take() next element if not done yet case class Result(value: Any) // sent by result producing actor case object Done // sent by result producing actor when finished class ResultCollector extends Actor { def receive = { case Result(value) => resultQueue.put(Some(value)) case Done => resultQueue.put(None) } } }
I use Option [Result] to indicate the end of the result stream with None. I experimented with snooping on the next item and using the "done" flag, but I hope there is an easier solution.
Bonus questions:
- How to implement sync / async implementation with Unit Tests, especially testing the delay of generating result?
- How can an iterator be thread safe?
source share