What is the right way to implement Consumer Producer in scala

I am trying to implement the Consumer Producer program in scala without using a queue. Since I think that Actor has already implemented the "mail queue" or something else, it would be superfluous to write the code again.

I tried to write the program in Actor cleanly. Below is a multi-user program for several consumers. The producer has been sleeping for a while to simulate doing something. Consumers do not sleep at all.

However, I donโ€™t know how to turn off the program if I donโ€™t add an observer actor to monitor consumers, as well as a Promise object for using โ€œAwaitโ€ (Supervisor class in code)

Is there any way to get rid of them?

import akka.actor.Actor.Receive import akka.actor._ import akka.routing._; import akka.util._ import scala.concurrent.{Await, Promise} import scala.concurrent.duration._ class Producer(val pool:ActorRef)(val name:String) extends Actor { def receive = { case _ => while (true) { val sleepTime = scala.util.Random.nextInt(1000) Thread.sleep(sleepTime) println("Producer %s send food" format name) pool ! name } } } class Consumer(supervisor : ActorRef)(val name:String) extends Actor { var counter = 0 def receive = { case s => counter += 1 println("%s eat food produced by %s" format (name,s)) if (counter >= 10) { println("%s is full" format name) context.stop(self) supervisor ! 1 } } } class Supervisor(p:Promise[String]) extends Actor { var r = 3 def receive = { case _ => r -= 1 if (0 == r) { println("All consumer stopped") context.stop(self) p success ("Good") } } } object Try3 { def work(): Unit = { val system = ActorSystem("sys1") val nProducer = 5; val nConsumer = 3; val p = Promise[String] val supervisor = system.actorOf(Props(new Supervisor(p))); val arrConsumer = for ( i <- 1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) ) val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) )) val arrProducer = for ( i <- 1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) ) arrProducer foreach (_ ! "start") Await.result(p.future,Duration.Inf) println("great!") system.shutdown } def main(args:Array[String]): Unit = { work() } } 

The Producer class get function has a problem that it will not be closed because it is in a state without violation.

The only way I can think of is to "send a message to the producer himself." Interestingly, is this the usual way to implement such a request?

Here is the modified code:

 class Producer(val pool:ActorRef)(val name:String) extends Actor { // original implementation: // def receive = { // case _ => // while (true){ // val sleepTime = scala.util.Random.nextInt(1000) // Thread.sleep(sleepTime) // println("Producer %s send food" format name) // pool ! name // } // } case object Loop; def receive = { case _ => val sleepTime = scala.util.Random.nextInt(1000) Thread.sleep(sleepTime) println("Producer %s send food" format name) pool ! name self ! Loop //send message to itself } } 

Regardless of my implementation, what is the correct way to implement the Producer Consumer program in scala using Actor or Future / Promise?

+5
source share
2 answers

You should never block (in your case, Thread.sleep, while loop) inside the actor. A lock inside an actor rages a thread from a thread pool used by all participants. Even a small number of producers like yours would make the actor in the ActorSystem threadless and render them unusable.

Instead, use a Scheduler to schedule periodic messaging in your Producer.

 override def preStart(): Unit = { import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule( initialDelay = 0.seconds, interval = 1.second, receiver = pool, message = name ) } 
+3
source

What do you think about the implementation of Terminator Actor :)

 object Terminator { case class WatchMe(ref: ActorRef) } class Terminator extends Actor { var consumers: Map[ActorRef, ActorRef] = Map() def receive = { case WatchMe(ref) => { consumers += ref -> ref context.watch(ref) } case Terminated(ref) => { context.unwatch(ref) consumers.get(ref).foreach { ref -> ref ! PoisonPill } consumers -= ref //If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy if(consumers.size == 0) { delegate() context.stop(self) } } } } 
0
source

Source: https://habr.com/ru/post/1204399/


All Articles