How can I get the first completed Actor in an Actors group in Scala?

I have a moderate number of long-running participants, and I want to write a synchronous function that returns the first one that completes. I can do this with a waiting time on futures (for example:

while (! fs.exists(f => f.isSet) ) { Thread.sleep(100) } val completeds = fs.filter(f => f.isSet) completeds.head() 

), but it seems very "un-Actor-y"

The scala.actors.Futures class has two methods, awaitAll() and awaitEither() , which seem terribly close; if there was awaitAny() , I would jump on it. Am I missing an easy way to do this or is there a generic template that is applicable?

+4
source share
2 answers

The key to me was the discovery that every Scala object (?) Is an implicit actor, so you can use Actor.react{ } to block. Here is my source code:

  import scala.actors._ import scala.actors.Actor._ //Top-level class that wants to return the first-completed result from some long-running actors class ConcurrentQuerier() { //Synchronous function; perhaps fulfilling some legacy interface def synchronousQuery : String = { //Instantiate and start the monitoring Actor val progressReporter = new ProgressReporter(self) //All (?) objects are Actors progressReporter.start() //Instantiate the long-running Actors, giving each a handle to the monitor val lrfs = List ( new LongRunningFunction(0, 2000, progressReporter), new LongRunningFunction(1, 2500, progressReporter), new LongRunningFunction(3, 1500, progressReporter), new LongRunningFunction(4, 1495, progressReporter), new LongRunningFunction(5, 1500, progressReporter), new LongRunningFunction(6, 5000, progressReporter) ) //Start 'em lrfs.map{ lrf => lrf.start() } println("All actors started...") val start = System.currentTimeMillis() /* This blocks until it receives a String in the Inbox. Who sends the string? A: the progressReporter, which is monitoring the LongRunningFunctions */ val s = receive { case s:String => s } println("Received " + s + " after " + (System.currentTimeMillis() - start) + " ms") s } } /* An Actor that reacts to a message that is a tuple ("COMPLETED", someResult) and sends the result to this Actor owner. Not strictly necessary (the LongRunningFunctions could post directly to the owner mailbox), but I like the idea that monitoring is important enough to deserve its own object */ class ProgressReporter(val owner : Actor) extends Actor { def act() = { println("progressReporter awaiting news...") react { case ("COMPLETED", s) => println("progressReporter received a completed signal " + s); owner ! s case s => println("Unexpected message: " + s ); act() } } } /* Some long running function */ class LongRunningFunction(val id : Int, val timeout : Int, val supervisor : Actor) extends Actor { def act() = { //Do the long-running query val s = longRunningQuery() println(id.toString + " finished, sending results") //Send the results back to the monitoring Actor (the progressReporter) supervisor ! ("COMPLETED", s) } def longRunningQuery() : String = { println("Starting Agent " + id + " with timeout " + timeout) Thread.sleep(timeout) "Query result from agent " + id } } val cq = new ConcurrentQuerier() //I don't think the Actor semantics guarantee that the result is absolutely, positively the first to have posted the "COMPLETED" message println("Among the first to finish was : " + cq.synchronousQuery) 

Typical results are as follows:

 scala ActorsNoSpin.scala progressReporter awaiting news... All actors started... Starting Agent 1 with timeout 2500 Starting Agent 5 with timeout 1500 Starting Agent 3 with timeout 1500 Starting Agent 4 with timeout 1495 Starting Agent 6 with timeout 5000 Starting Agent 0 with timeout 2000 4 finished, sending results progressReporter received a completed signal Query result from agent 4 Received Query result from agent 4 after 1499 ms Among the first to finish was : Query result from agent 4 5 finished, sending results 3 finished, sending results 0 finished, sending results 1 finished, sending results 6 finished, sending results 
0
source

A more "acting" way of waiting for completion is to create an actor responsible for processing the completed result (lets call it ResultHandler )

Instead of answering, workers send their response to the ResultHandler on a fire-and-forget basis. The latter will continue to process the result, while other employees will complete their work.

+2
source

Source: https://habr.com/ru/post/1379400/


All Articles