Akka Streams: how can I simulate power / speed limitation in a system of 2 connected streams?

Let's say I have a pizza oven and a pizza line that I need to bake. My oven has the ability to bake only 4 pizzas at a time, and it is reasonable to expect that at least 4 lines will always be in line for the day, so the oven should be as full as possible as often as possible.

Every time I put pizza in the oven, I set a timer on my phone. As soon as this goes out, I take out the pizza from the oven, give it to those who want it, and a container becomes available.

I have 2 sources here, one of which is the pizza line that needs to be cooked, and one of the egg timers that goes out when making the pizza. The system also has 2 sinks, one of which is the place where the pizza was cooked, and the other is the place to send confirmation that the pizza was placed in the oven.

I currently present these naively:

Source.fromIterator(() => pizzas)
    .map(putInOven) // puts in oven and sets a timer
    .runWith(Sink.actorRef(confirmationDest, EndSignal))

Source.fromIterator(() => timerAlerts)
    .map(removePizza)
    .runWith(Sink.actorRef(pizzaDest, EndSignal))

However, these two streams are currently completely independent of each other. The eggTimer function works correctly, removing pizza whenever it is collected. But it cannot signal a previous thread that has become available. In fact, the first stream has no idea of ​​power at all, and will simply try to squeeze the pizza into the oven as soon as they join the line.

Akka , , , "" , .

, :

   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                                          
β”Œβ”€>β”‚CapacityAvailβ”‚>──┐                                                      
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”œβ”€β”€>β”‚     Zip     β”‚>─>β”‚  PutInOven  β”‚>─>β”‚   Confirm   β”‚
β”‚  β”‚    Queue    β”‚>β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                          
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                    
β”‚  β”‚    Done     β”‚>─────>β”‚  SendPizza  β”‚                                    
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                    
β”‚         v                                                                 
β”‚         β”‚                                                                 
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    

, , , CapacityAvailable, CapacityAvail. , Pizza, , , zip .

, , CapacityAvailable .

, , , , CapacityAvail, , Source . Source/Sink/Flow, ?

+4
3

. faux-state . Source.queue , , . , , .

RunnableGraph.fromGraph(GraphDSL.create() {
  implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Our Capacity Bucket. Can be refilled by passing CapacityAvaiable objects 
    // into capacitySrc. Can be consumed by using capacity as a Source.
    val (capacity, capacitySrc) =
      peekMatValue(Source.queue[CapacityAvailable.type](CONCURRENT_CAPACITY,
                                                        OverflowStrategy.fail))

    // Set initial capacity
    capacitySrc.foreach(c =>
      Seq.fill(CONCURRENT_CAPACITY)(CapacityAvailable).foreach(c.offer))


    // Pull pizzas from the RabbitMQ queue
    val cookQ = RabbitSource(rabbitControl, channel(qos = CONCURRENT_CAPACITY),
                             consume(queue("pizzas-to-cook")), body(as[TaskRun]))

    // Take the blocking events stream and turn into a source
    // (Blocking in a separate dispatcher)
    val cookEventsQ = Source.fromIterator(() => oven.events().asScala)
        .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))

    // Split the events stream into two sources so 2 flows can be attached
    val bc = builder.add(Broadcast[PizzaEvent](2))

    // Zip pizzas with the capacity pool. Stops cooking pizzas when oven full.
    // When cooking starts, send the confirmation back to rabbitMQ
    cookQ.zip(AckedSource(capacity)).map(_._1)
      .mapAsync(CONCURRENT_CAPACITY)(pizzaOven.cook)
      .map(Message.queue(_, "pizzas-started-cooking"))
      .acked ~> Sink.actorRef(rabbitControl, HostDied)

    // Send the cook events stream into two flows
    cookEventsQ ~> bc.in

    // The first tops up the capacity pool
    bc.out(0)
      .mapAsync(CONCURRENT_CAPACITY)(e =>
         capacitySrc.flatMap(cs => cs.offer(CapacityAvailable))
      ) ~> Sink.ignore

    // The second sends out cooked events
    bc.out(1)
      .map(p => Message.queue(Cooked(p.id()), "pizzas-cooked")
    ) ~> Sink.actorRef(rabbitControl, HostDied)

    ClosedShape
}).run()
0

Akka. Akka ; :

Akka Streams Reactive Streams .

, - , , . , , " " , .

- ju-jitsu concurrency, , , . , concurrency. , .

, (.. " " ). . , :

+2

mapAsyncUnordered parallelism=4. Future (http://doc.akka.io/docs/akka/2.4/scala/futures.html#After) .

+1
source

Source: https://habr.com/ru/post/1659362/


All Articles