Idiomatic timeouts for long processes in multi-threaded scala

So, I see some questions about stackoverflow that somehow ask how to "kill" the future, a la outdated Thread.stop (). I see answers explaining why this is not possible, but not an alternative mechanism for solving such problems.

For example: Practical use of futures? That is, how to kill them?

I understand that the future cannot be "killed."

I know how I could do this in different ways: break the task into smaller hibernations and have some “volatile boolean isStillRunning” in the thread class, which is periodically checked. If I cancel the stream by updating this value, the stream will exit. This implies a “collaborative state” (varStar isStillRunning), and if I did the same in Scala it would not seem “functional”.

What is the correct way to solve this problem in the idiomatic functionality of scala? Is there any reasonable way to do this? Should I return to “normal” flows and erratic flags? Should I use @volatile in the same way as the Java keyword?

+4
source share
2 answers

I think I have found the best solution to my problem. Instead of using a mutable variable to let the operation know when to die, I can send an exit message with a higher priority for the actor. It looks something like this:

val a = new Actor() { def act():Unit = { loop{ react { case "Exit" => exit(); return; case MyMessage => { //this check makes "Exit" a high priority message, if "Exit" //is on the queue, it will be handled before proceeding to //handle the real message. receiveWithin(0) { case "Exit" => exit(); return case TIMEOUT => //don't do anything. } sender ! "hi!" //reply to sender } }} } } a.start() val f = a !! MyMessage while( ! f.isSet && a.getState != Actor.State.Terminated ) { //will probably return almost immediately unless the Actor was terminated //after I checked. Futures.awaitAll(100,f) } if( a.getState != Actor.State.Terminated ) { f() // the future should evaluate to "hi!" } a ! "Exit" //stops the actor from processing anymore messages. //regardless of if any are still on the queue. a.getState // terminated 

There is probably a cleaner way to write this ... but that's about what I did in my application.

The responseWithin (0) is an immediate no-op operation if there is no Exit message in the queue. The queue'd "Exit" message replaces the volatile boolean that I would put in a streaming Java application.

+1
source

Yes, it looks the same as in Java.

For a test setup where the test may freeze or run for too long, I use the promise to fail the test (for some reason). For example, a timeout monitor may “cancel” a test runner (interrupt the stream and compare the AndSet flag), and then end the promise with an error. Or, test preparation may fail with a failed test earlier. Or the test passes and gives a result. At a higher level, the test rig simply sees the future and its value.

What makes Java different are your options for composing futures.

 val all = Future.traverse(tests)(test => { val toKeep = promise[Result] // promise to keep, or fail by monitor val f = for (w <- feed(prepare(test, toKeep))) yield { monitored(w, toKeep) { listener.start(w) w.runTest() } } f completing consume _ // strip context val g = f map (r => r.copy(context = null)) (toKeep completeWith g).future }) 
+1
source

Source: https://habr.com/ru/post/1432621/


All Articles