How to check akka closed-form flow stream with encapsulated source and drain

I created akka thread that has a process function and an error handler function passed to it. Sourceand Sinkfully encapsulated inClosedShape RunnableFlow. My intention is to pass the element to the parent class and run it through the stream. It all seems to work until I get to testing. I use scala -test and passing the listing to the lists inside the process function and the error handler function. I randomly generate errors to see how everything happens with the error handler function. The problem is that if I pass 100 elements to the parent class, I would expect the list of elements in the error function and the list of elements in the process function to contain up to 100. Since the source and drain are fully encapsulated, there is a clear way to tell the test to wait, and it receives assert / should statements before all elements are processed through the stream. I created this meaning to describe the flow.

Here's an example for the example above:

import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._

class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  def this() = this(ActorSystem("TestSpec"))

  override def afterAll = {
    Thread.sleep(500)
    mat.shutdown()
    TestKit.shutdownActorSystem(system)
  }

  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))

  "TestSpec" must {
    "handle messages" in {
      val testStream = new Testing()                                                 // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
      (1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on

      testStream.errors.size should not be (0)                                       // passes
      testStream.processed.size should not be (0)                                    // passes
      (testStream.processed.size + testStream.errors.size) should be (100)           // fails due to checking before all items are processed
    }
  }
}
+4
source share
1 answer

In a comment from Victor Clan on a related Gist. This proves to be a great solution:

def consume(
    errorHandler: BadData => Unit, fn: Data => Unit, a: String
  ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
    GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
      import GraphDSL.Implicits._

      val source = b.add(Source.single(a))
      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Zip[String, String])
      val process = new ProcessorFlow(fn)
      val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
      val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
        (input: Xor[BadData, Data]) =>
          input.swap.getOrElse((new Throwable, ("", "")))
      ))

      source ~> broadcast.in
                broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                         merge.out ~> process ~> failed ~> errors ~> sink

      ClosedShape
    }
  )

This allows me Await.resultto runnablegraph for testing purposes. Thanks again to Victor for this decision!

+1
source

Source: https://habr.com/ru/post/1629634/


All Articles