I am trying to implement the Consumer Producer program in scala without using a queue. Since I think that Actor has already implemented the "mail queue" or something else, it would be superfluous to write the code again.
I tried to write the program in Actor cleanly. Below is a multi-user program for several consumers. The producer has been sleeping for a while to simulate doing something. Consumers do not sleep at all.
However, I donโt know how to turn off the program if I donโt add an observer actor to monitor consumers, as well as a Promise object for using โAwaitโ (Supervisor class in code)
Is there any way to get rid of them?
import akka.actor.Actor.Receive import akka.actor._ import akka.routing._; import akka.util._ import scala.concurrent.{Await, Promise} import scala.concurrent.duration._ class Producer(val pool:ActorRef)(val name:String) extends Actor { def receive = { case _ => while (true) { val sleepTime = scala.util.Random.nextInt(1000) Thread.sleep(sleepTime) println("Producer %s send food" format name) pool ! name } } } class Consumer(supervisor : ActorRef)(val name:String) extends Actor { var counter = 0 def receive = { case s => counter += 1 println("%s eat food produced by %s" format (name,s)) if (counter >= 10) { println("%s is full" format name) context.stop(self) supervisor ! 1 } } } class Supervisor(p:Promise[String]) extends Actor { var r = 3 def receive = { case _ => r -= 1 if (0 == r) { println("All consumer stopped") context.stop(self) p success ("Good") } } } object Try3 { def work(): Unit = { val system = ActorSystem("sys1") val nProducer = 5; val nConsumer = 3; val p = Promise[String] val supervisor = system.actorOf(Props(new Supervisor(p))); val arrConsumer = for ( i <- 1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) ) val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) )) val arrProducer = for ( i <- 1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) ) arrProducer foreach (_ ! "start") Await.result(p.future,Duration.Inf) println("great!") system.shutdown } def main(args:Array[String]): Unit = { work() } }
The Producer class get function has a problem that it will not be closed because it is in a state without violation.
The only way I can think of is to "send a message to the producer himself." Interestingly, is this the usual way to implement such a request?
Here is the modified code:
class Producer(val pool:ActorRef)(val name:String) extends Actor { // original implementation: // def receive = { // case _ => // while (true){ // val sleepTime = scala.util.Random.nextInt(1000) // Thread.sleep(sleepTime) // println("Producer %s send food" format name) // pool ! name // } // } case object Loop; def receive = { case _ => val sleepTime = scala.util.Random.nextInt(1000) Thread.sleep(sleepTime) println("Producer %s send food" format name) pool ! name self ! Loop //send message to itself } }
Regardless of my implementation, what is the correct way to implement the Producer Consumer program in scala using Actor or Future / Promise?
source share