Using dynamic receiver assignment in Akka Streams

I am new to Akka and trying to learn the basics. My use case is to constantly read messages from the JMS queue and output each message to a new file. I have basic settings working with:

Source<String, NotUsed> jmsSource =
  JmsSource
    .textSource(JmsSourceSettings
    .create(connectionFactory)
    .withQueue("myQueue")
    .withBufferSize(10));

Sink<ByteString, CompletionStage<IOResult>> fileSink =
  FileIO.toFile(new File("random.txt"));

final Flow<String, ByteString, NotUsed> flow = Flow.fromFunction((String n) -> ByteString.fromString(n));

final RunnableGraph<NotUsed> runnable = jmsSource.via(flow).to(fileSink);

runnable.run(materializer);

However, I want the file name to be dynamic (and not hardcoded in "random.txt"): it should be changed depending on the contents of each message in the queue. Of course, I could pick up the file name in the stream, but how can I set this name to fileSink? How do I set this up?

+5
source share
3 answers

Sink, akka.stream.impl.LazySink. , GitHub Gist.

import akka.NotUsed
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._

class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] {

  val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in")
  override val shape = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    override def preStart(): Unit = pull(in)

    val awaitingElementHandler = new InHandler {
      override def onPush(): Unit = {
        val element = grab(in)
        val innerSource = createInnerSource(element)
        val innerSink = sink(element)
        Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer)
      }

      override def onUpstreamFinish(): Unit = completeStage()

      override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
    }
    setHandler(in, awaitingElementHandler)

    def createInnerSource(element: T): SubSourceOutlet[T] = {
      val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource")

      innerSource.setHandler(new OutHandler {
        override def onPull(): Unit = {
          innerSource.push(element)
          innerSource.complete()
          if (isClosed(in)) {
            completeStage()
          } else {
            pull(in)
            setHandler(in, awaitingElementHandler)
          }
        }

        override def onDownstreamFinish(): Unit = {
          innerSource.complete()
          if (isClosed(in)) {
            completeStage()
          }
        }
      })

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val illegalStateException = new IllegalStateException("Got a push that we weren't expecting")
          innerSource.fail(illegalStateException)
          failStage(illegalStateException)
        }

        override def onUpstreamFinish(): Unit = {
          // We don't stop until the inner stream stops.
          setKeepGoing(true)
        }

        override def onUpstreamFailure(ex: Throwable): Unit = {
          innerSource.fail(ex)
          failStage(ex)
        }
      })

      innerSource
    }
  }
}

object OneToOneOnDemandSink {

  def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink))
}

Sink , , LazySink, .

+3

:

  • , map ,
  • , flatMapConcat ,
  • , GraphDSL.

:

$ tail -n +1 -- *.txt
==> 1.txt <==
1

==> 2.txt <==
2

==> 3.txt <==
3

==> 4.txt <==
4

==> 5.txt <==
5

  • map:

    import java.nio.file.Paths
    
    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl.{FileIO, Sink, Source}
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    object Example extends App {
      override def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem("Example")
        implicit val materializer = ActorMaterializer()
    
        val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
          .map(
            elem => Source.single(ByteString(s"$elem\n"))
                .runWith(FileIO.toPath(Paths.get(s"$elem.txt")))
          )
          .runWith(Sink.seq)
    
        implicit val ec = system.dispatcher
        result.onComplete(_ => system.terminate())
      }
    }
    

    : map Int , Source.single(ByteString(...)).runWith(FileIO.toPath(...), Future[IOResult] Sink.seq.

    Documentation

    map , .

    ,

    , upstream

    . :


  1. flatMapConcat:

    import java.nio.file.Paths
    
    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl.{FileIO, Sink, Source}
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    object Example extends App {
      override def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem("Example")
        implicit val materializer = ActorMaterializer()
    
        val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
          .flatMapConcat(
            elem => Source.single(
              Source.single(ByteString(s"$elem\n"))
                .runWith(FileIO.toPath(Paths.get(s"$elem.txt")))
            )
          )
          .runWith(Sink.seq)
    
        implicit val ec = system.dispatcher
        result.onComplete(_ => system.terminate())
      }
    }
    

    : flatMapConcat a Source. , mat Source.single(ByteString(...)).runWith(FileIO.toPath(...), Future[IOResult] Sink.seq. .

    Documentation

    flatMapConcat Source, . , , .

    ,

    , ,

    . :


    1. Sink GraphDSL:

      import java.nio.file.Path
      
      import akka.stream.scaladsl.{Broadcast, FileIO, Flow, GraphDSL, Sink, Source, ZipWith}
      import akka.stream.{IOResult, Materializer, SinkShape}
      import akka.util.ByteString
      
      import scala.concurrent.Future
      
      object FileSinks {
        def dispatch[T](
                         dispatcher: T => Path,
                         serializer: T => ByteString
                       )(
                         implicit materializer: Materializer
                       ): Sink[T, Future[Seq[Future[IOResult]]]] =
          Sink.fromGraph(
            GraphDSL.create(
              Sink.seq[Future[IOResult]]
            ) {
              implicit builder =>
                sink =>
                  // prepare this sink graph elements:
                  val broadcast = builder.add(Broadcast[T](2))
                  val serialize = builder.add(Flow[T].map(serializer))
                  val dispatch = builder.add(Flow[T].map(dispatcher))
                  val zipAndWrite = builder.add(ZipWith[ByteString, Path, Future[IOResult]](
                    (bytes, path) => Source.single(bytes).runWith(FileIO.toPath(path)))
                  )
      
                  // connect the graph:
                  import GraphDSL.Implicits._
                  broadcast.out(0) ~> serialize ~> zipAndWrite.in0
                  broadcast.out(1) ~> dispatch ~> zipAndWrite.in1
                  zipAndWrite.out ~> sink
      
                  // expose ports:
                  SinkShape(broadcast.in)
            }
          )
      }
      ----
      import java.nio.file.Paths
      
      import FileSinks
      import akka.actor.ActorSystem
      import akka.stream._
      import akka.stream.scaladsl.Source
      import akka.util.ByteString
      
      import scala.concurrent.Future
      
      object Example extends App {
        override def main(args: Array[String]): Unit = {
          implicit val system = ActorSystem("Example")
          implicit val materializer = ActorMaterializer()
      
          val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
            .runWith(FileSinks.dispatch[Int](
              elem => Paths.get(s"$elem.txt"),
              elem => ByteString(s"$elem\n"))
            )
      
          implicit val ec = system.dispatcher
          result.onComplete(_ => system.terminate())
        }
      }
      

. Akka Stream.

0

PartitionHub should serve your purpose. Being dynamic, you can create and attach a new receiver on demand.

-1
source

Source: https://habr.com/ru/post/1681790/


All Articles