Akka Stream, source from function?

I want to have a source that evaluates a function at given intervals and emits its output. As a workaround, I can do this with Source.queue+ offer, but have not found a cleaner way to do this. Ideally, I would have something like

def myFunction() = ....                     // function with side-effects 
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick

Any ideas?

+4
source share
2 answers

Perhaps the cleanest way is to use map

Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
+8
source

I think throttlethis is what you need. A fully-run example with Sourceapplied to iterable that uses a function in next():

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source

import scala.concurrent.duration._

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0

def myFunction(): Int = {
  i = i + 1
  i
}

import scala.collection.immutable.Iterable

val x: Iterable[Int] = new Iterable[Int] {
  override def iterator: Iterator[Int] =
    new Iterator[Int] {
      override def hasNext: Boolean = true

      override def next(): Int = myFunction()
    }
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)

throttle: 1 1 max burst = 1, ( Shaping ).

+1

Source: https://habr.com/ru/post/1675526/


All Articles