How to close a file after the stream ends?

I have a java.io.File sequence thread. And I use flatMapConcatto create a new file Source, something like this:

def func(files: List[File]) =
  Source(files)
    .map(f => openFile(f))
    .flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)

Is there an easy way to close each file after the stream ends? SomePublisher()can't close it.

+4
source share
2 answers

So, I found one good way to solve my problem, but if you have another way, I would also like to see it.

def someSource(file: File) = {
  val f = openFile(file)

  Source
    .fromPublisher(SomePublisher(f))
    .transform(() => new PushStage[?, ?] {
      override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)

      override def postStop(): Unit = {
        f.close()
        super.postStop()
      }
    }
}

def func(files: List[File]) =
  Source(files)
    .flatMapConcat(someSource)
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)
+1
source

, , : , . , db , . , db , .

- , , db, , db. .

, sniplets , .

+1

Source: https://habr.com/ru/post/1625187/


All Articles