Is there a groupBy leak in akka-stream?

I want to write a stream on akka-stream to group events from an endless session_uid stream and calculate the amount of traffic for each session (details in the previous question ) .

I am going to use the Source#groupBy for session_uid group events, but it looks like this function accumulates all the group keys inside and has no way to issue them. This is caused by the exception java.lang.OutOfMemoryError: Java heap space . Here is the code to play it:

 import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} import scala.util.Random object GroupByMemoryLeakApplication extends App { implicit val system = ActorSystem() import system.dispatcher implicit val materializer = ActorMaterializer() val bigString = Random.nextString(512 * 1024) // This is infinite stream of events (ie this is session ids) val eventsSource = Source(() => (1 to 1000000000).iterator) .map((i) => { (i, bigString + i) }) // This is flow pass event through groupBy function val groupByFlow = Flow[(Int, String)] .groupBy(_._2) .map { case (sessionUid, sessionEvents) => sessionEvents .map(e => { println(e._1); e }) .runWith(Sink.head) } .mapAsync(4)(identity) eventsSource .via(groupByFlow) .runWith(Sink.ignore) .onComplete(_ => system.shutdown()) } 

So, how to release a grouping key ( sessionUid ) inside groupBy after fully processing the associated event stream ( sessionEvents )?

Maybe someone is known in another way of grouping events on the basis of session_uid on akka-stream ?

+3
source share

Source: https://habr.com/ru/post/1236489/


All Articles