How to deal with a source that emits Future [T]?

Let's say I have an iterator:

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...) 


And I want to create a source from this iterator:

 val source: Source[Future[Int], NotUsed] = Source.fromIterator(() => nextElemIter) 


So now my source is emitting Future s. I never saw futures go between steps in Akka docs or elsewhere, so instead, I could always do something like this:

 val source: Source[Int, NotUsed] = Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */) 


And now I have a regular source that emits T instead of Future[T] . But it seems hacked and wrong.

How to deal with such situations?

+5
source share
1 answer

Answering your question directly: I agree with Vladimir’s comment that there is nothing “hacked” in using mapAsync for the purpose you described. I can't think of a more direct way to deploy Future around your base Int values.

The answer to your question is indirect ...

Try to stick to futures

Threads, as a concurrency mechanism, are incredibly useful when backpressure is required. However, pure Future operations also occur in applications.

If your Iterator[Future[Int]] will generate a known, limited number of Future values, then you might want to use futures for concurrency.

Imagine you want to filter, display, and decrease the values ​​of Int .

 def isGoodInt(i : Int) : Boolean = ??? //filter def transformInt(i : Int) : Int = ??? //map def combineInts(i : Int, j : Int) : Int = ??? //reduce 

Futures provide a direct way to use these functions:

 val finalVal : Future[Int] = Future sequence { for { f <- nextElemIter.toSeq //launch all of the Futures i <- f if isGoodInt(i) } yield transformInt(i) } map (_ reduce combineInts) 

Compared to the somewhat indirect way of using Stream, as you suggested:

 val finalVal : Future[Int] = Source.fromIterator(() => nextElemIter) .via(Flow[Future[Int]].mapAsync(1)(identity)) .via(Flow[Int].filter(isGoodInt)) .via(Flow[Int].map(transformInt)) .to(Sink.reduce(combineInts)) .run() 
+4
source

Source: https://habr.com/ru/post/1257383/


All Articles