Iterate with hasNext () and next () over an asynchronously generated stream of elements

I need to implement an Iterator interface (as defined by the Java API), with hasNext () and next () methods, which should return result elements that come from an asynchronously processed HTTP response (handled by Akka actors).

The following requirements must be met:

  • Do not block and wait for the async operation to complete, since generating a large set of results may take some time (the iterator should return the elements of the result as soon as they become available).
  • Iterator.next () should block until the next item is available (or throw an exception if there are no more items)
  • Iterator.hasNext () should return true as long as more elements remain (even if the next is not yet available)
  • the total number of results is not known in advance. Agents producing the result will send a specific “final message” when it is completed.
  • try to avoid using InterruptedException, for example. when the iterator expects an empty queue but no more items are created.

I haven't watched Java 8 threads or Akka threads yet. But since I basically have to iterate over the queue (the final thread), I doubt that there is any suitable solution.

Currently, my Scala stub uses java.util.concurrent.BlockingQueue and looks like this:

class ResultStreamIterator extends Iterator[Result] { val resultQueue = new ArrayBlockingQueue[Option[Result]](100) def hasNext(): Boolean = ??? // return true if not done yet def next(): Result = ??? // take() next element if not done yet case class Result(value: Any) // sent by result producing actor case object Done // sent by result producing actor when finished class ResultCollector extends Actor { def receive = { case Result(value) => resultQueue.put(Some(value)) case Done => resultQueue.put(None) } } } 

I use Option [Result] to indicate the end of the result stream with None. I experimented with snooping on the next item and using the "done" flag, but I hope there is an easier solution.

Bonus questions:

  • How to implement sync / async implementation with Unit Tests, especially testing the delay of generating result?
  • How can an iterator be thread safe?
+6
source share
3 answers

I followed Jiro's suggestions and made some adaptations as needed. In general, I like the getNext() and next() approach, implemented as ask messages sent to an actor. This ensures that at any time there will be only one thread that changes the queue.

However, I'm not sure about the performance of this implementation, since ask and Await.result will create two threads for each hasNext() and next() call.

 import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.language.postfixOps import akka.actor.{ActorRef, ActorSystem, Props, Stash} import akka.pattern.ask import akka.util.Timeout case object HasNext case object GetNext case class Result(value: Any) case object Done class ResultCollector extends Actor with Stash { val queue = scala.collection.mutable.Queue.empty[Result] def collecting: Actor.Receive = { case HasNext => if (queue.isEmpty) stash else sender ! true case GetNext => if (queue.isEmpty) stash else sender ! queue.dequeue case value: Result => unstashAll; queue += value case Done => unstashAll; context become serving } def serving: Actor.Receive = { case HasNext => sender ! queue.nonEmpty case GetNext => sender ! { if (queue.nonEmpty) queue.dequeue else new NoSuchElementException } } def receive = collecting } class ResultStreamIteration(resultCollector: ActorRef) extends Iterator { implicit val timeout: Timeout = Timeout(30 seconds) override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match { case b: Boolean => b } override def next(): Any = Await.result(resultCollector ? GetNext, Duration.Inf) match { case Result(value: Any) => value case e: Throwable => throw e } } object Test extends App { implicit val exec = scala.concurrent.ExecutionContext.global val system = ActorSystem.create("Test") val actorRef = system.actorOf(Props[ResultCollector]) Future { for (i <- 1 to 10000) actorRef ! Result(s"Result $i"); actorRef ! Done } val iterator = new ResultStreamIteration(actorRef) while (iterator.hasNext()) println(iterator.next) system.shutdown() } 
0
source

The following code will meet the requirements. Actor fields can be safely changed in the Actor receiver. Therefore, resultQueue should not be in the Iterator field, but in the Actor field.

 // ResultCollector should be initialized. // Initilize code is like... // resultCollector ! Initialize(100) class ResultStreamIterator(resultCollector: ActorRef) extends Iterator[Result] { implicit val timeout: Timeout = ??? override def hasNext(): Boolean = Await.result(resultCollector ? HasNext, Duration.Inf) match { case ResponseHasNext(hasNext) => hasNext } @scala.annotation.tailrec final override def next(): Result = Await.result(resultCollector ? RequestResult, Duration.Inf) match { case ResponseResult(result) => result case Finished => throw new NoSuchElementException("There is not result.") case WaitingResult => next()// should be wait for a moment. } } case object RequestResult case object HasNext case class ResponseResult(result: Result) case class ResponseHasNext(hasNext: Boolean) case object Finished case object WaitingResult case class Initialize(expects: Int) // This code may be more ellegant if using Actor FSM // Acotr State is (beforeInitialized)->(collecting)->(allCollected) class ResultCollector extends Actor with Stash { val results = scala.collection.mutable.Queue.empty[Result] var expects = 0 var counts = 0 var isAllCollected = false def beforeInitialized: Actor.Receive = { case Initialize(n) => expects = n if (expects != 0) context become collecting else context become allCollected unstashAll case _ => stash() } def collecting: Actor.Receive = { case RequestResult => if (results.isEmpty) sender ! WaitingResult else sender ! ResponseResult(results.dequeue()) case HasNext => ResponseHasNext(true) case result: Result => results += result counts += 1 isAllCollected = counts >= expects if (isAllCollected) context become allCollected } def allCollected: Actor.Receive = { case RequestResult => if (results.isEmpty) sender ! Finished else sender ! ResponseResult(results.dequeue()) case HasNext => ResponseHasNext(!results.isEmpty) } def receive = beforeInitialized } 
0
source

You can save the next element with a variable and just wait for it at the beginning of both methods:

 private var nextNext: Option[Result] = null def hasNext(): Boolean = { if (nextNext == null) nextNext = resultQueue.take() return !nextNext.isEmpty } def next(): Result = { if (nextNext == null) nextNext = resultQueue.take() if (nextNext.isEmpty) throw new NoSuchElementException() val result = nextNext.get nextNext = null return result } 
0
source

Source: https://habr.com/ru/post/986860/


All Articles