Akka streams groupBy is one approach. But groupBy has a maxSubstreams parameter that will require you to know that the maximum number of ID ranges is ahead. So: the solution below uses scan to identify blocks with the same identifier and splitWhen to split into substreams:
object Main extends App { implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() def extractId(s: String) = { val a = s.split(",") a(0) -> a(1) } val file = new File("/tmp/example.csv") private val lineByLineSource = FileIO.fromFile(file) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) .map(_.utf8String) val future: Future[Done] = lineByLineSource .map(extractId) .scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) ) .drop(1) .splitWhen(_._1) .fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) )) .concatSubstreams .runForeach(println) private val reply = Await.result(future, 10 seconds) println(s"Received $reply") Await.ready(system.terminate(), 10 seconds) }
extractId breaks the rows into tuples id β data. scan prepends id -> data tuples with the start of range flag. drop discards the primer element before scan . splitWhen launches a new substream for each beginning of the range. fold combines the substreams into lists and removes the boolean value start-of-ID-range, so that each substream creates one element. Instead of folding, you probably need a custom SubFlow that processes row streams for a single ID and emits some result for a range of IDs. concatSubstreams combines the concatSubstreams for each level created by splitWhen back into one thread, which is printed by runForEach .
Run with:
$ cat /tmp/example.csv ID1,some input ID1,some more input ID1,last of ID1 ID2,one line of ID2 ID3,2nd before eof ID3,eof
Output:
(ID1,List(some input, some more input, last of ID1)) (ID2,List(one line of ID2)) (ID3,List(2nd before eof, eof))
source share