Sink for linear back pressure IO file

I have a file processing job that currently uses manual backpressure accers to process the processing pipeline, but I could never successfully control the backpressure while reading the input files.

This job takes an input file and groups the lines by the identification number present at the beginning of each line, and then when it gets to the line with the new identification number, it pushes the grouped lines out to the handler via a message and then continues the new identification number until it reaches the end file.

It seems like this would be a good precedent for Akka streams using the file as a shell, but I'm still not sure about three things:

1) How can I read the file line by line?

2) How can I group by the identifier present on each line? I am currently using the much-needed processing for this, and I don’t think I will have the same ability in the in-line pipeline.

3) How can I apply backpressure so that I do not read lines in memory faster than process data downstream?

+5
source share
2 answers

Akka streams groupBy is one approach. But groupBy has a maxSubstreams parameter that will require you to know that the maximum number of ID ranges is ahead. So: the solution below uses scan to identify blocks with the same identifier and splitWhen to split into substreams:

 object Main extends App { implicit val system = ActorSystem("system") implicit val materializer = ActorMaterializer() def extractId(s: String) = { val a = s.split(",") a(0) -> a(1) } val file = new File("/tmp/example.csv") private val lineByLineSource = FileIO.fromFile(file) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) .map(_.utf8String) val future: Future[Done] = lineByLineSource .map(extractId) .scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) ) .drop(1) .splitWhen(_._1) .fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) )) .concatSubstreams .runForeach(println) private val reply = Await.result(future, 10 seconds) println(s"Received $reply") Await.ready(system.terminate(), 10 seconds) } 

extractId breaks the rows into tuples id β†’ data. scan prepends id -> data tuples with the start of range flag. drop discards the primer element before scan . splitWhen launches a new substream for each beginning of the range. fold combines the substreams into lists and removes the boolean value start-of-ID-range, so that each substream creates one element. Instead of folding, you probably need a custom SubFlow that processes row streams for a single ID and emits some result for a range of IDs. concatSubstreams combines the concatSubstreams for each level created by splitWhen back into one thread, which is printed by runForEach .

Run with:

 $ cat /tmp/example.csv ID1,some input ID1,some more input ID1,last of ID1 ID2,one line of ID2 ID3,2nd before eof ID3,eof 

Output:

 (ID1,List(some input, some more input, last of ID1)) (ID2,List(one line of ID2)) (ID3,List(2nd before eof, eof)) 
+7
source

It seems that the easiest way to add backpressure to your system without making huge changes is to simply change the mailbox type of the incoming groups consuming Actor to BoundedMailbox .

  • Change the type of Actor that consumes your lines in BoundedMailbox with high mailbox-push-timeout-time :

     bounded-mailbox { mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" mailbox-capacity = 1 mailbox-push-timeout-time = 1h } val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox")) 
  • Create an iterator from your file, create a grouped (by id) iterator using an iterator. Then simply browse the data, sending groups to use the Actor. Please note that sending will be blocked in this case when the Actor mailbox is full.

     def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = { def rec(s: Stream[A]): Stream[Seq[A]] = if (s.isEmpty) Stream.empty else { s.span(keyFun(s.head) == keyFun(_)) match { case (prefix, suffix) => prefix.toList #:: rec(suffix) } } rec(iter.toStream).toIterator } val lines = Source.fromFile("input.file").getLines() iterGroupBy(lines){l => l.headOption}.foreach { lines:Seq[String] => actor.tell(lines, ActorRef.noSender) } 

What is it! You will probably want to transfer the material for reading files to a separate stream, since it is going to be blocked. Also, by adjusting the mailbox-capacity , you can adjust the amount of memory consumed. But if reading a batch from a file is always faster than processing, it seems reasonable to keep the capacity small, for example, 1 or 2.

upd iterGroupBy implemented with Stream is not tested to create StackOverflow .

0
source

Source: https://habr.com/ru/post/1247242/


All Articles