Scala Futures - Built-in Timeout?

There is an aspect of futures that I donโ€™t understand from the official ref textbook. http://docs.scala-lang.org/overviews/core/futures.html

Do scala futures have a built-in timeout mechanism? Say the example below is a 5 gigabyte text file ... the implicit scope of "Implicits.global" ultimately causes onFailure to fire in a non-blocking way or can it be determined? And without any default timeout, does this mean that not a single success and failure will ever work?

import scala.concurrent._ import ExecutionContext.Implicits.global val firstOccurence: Future[Int] = future { val source = scala.io.Source.fromFile("myText.txt") source.toSeq.indexOfSlice("myKeyword") } firstOccurence onSuccess { case idx => println("The keyword first appears at position: " + idx) } firstOccurence onFailure { case t => println("Could not process file: " + t.getMessage) } 
+52
scala concurrency
Apr 30 '13 at 16:15
source share
13 answers

You only get timeouts when you use locking to get Future results. If you want to use the non-blocking onComplete , onSuccess or onFailure , then you have to minimize your own timeout processing. Akka has built timeout processing for messaging / response ( ? ) Between participants, but I'm not sure if you want to start using Akka. FWIW, in Akka, to handle the timeout, they compose two Futures together through Future.firstCompletedOf , one of which is the real asynchronous call task, and one that represents the timeout. If the timeout timer (via HashedWheelTimer ) appears first, you are denied an async callback.

A very simplified example of how you can flip your own might look something like this. First, an object for a timeout schedule:

 import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout} import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration import scala.concurrent.Promise import java.util.concurrent.TimeoutException object TimeoutScheduler{ val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS) def scheduleTimeout(promise:Promise[_], after:Duration) = { timer.newTimeout(new TimerTask{ def run(timeout:Timeout){ promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis")) } }, after.toNanos, TimeUnit.NANOSECONDS) } } 

Then a function to take the Future and add a wait time to it:

 import scala.concurrent.{Future, ExecutionContext, Promise} import scala.concurrent.duration.Duration def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = { val prom = Promise[T]() val timeout = TimeoutScheduler.scheduleTimeout(prom, after) val combinedFut = Future.firstCompletedOf(List(fut, prom.future)) fut onComplete{case result => timeout.cancel()} combinedFut } 

Note that the HashedWheelTimer that I use here is Netty.

+67
Apr 30 '13 at 16:48
source share

I just created a TimeoutFuture class for a colleague:

Timeoutfuture

 package model import scala.concurrent._ import scala.concurrent.duration._ import play.libs.Akka import play.api.libs.concurrent.Execution.Implicits._ object TimeoutFuture { def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = { val prom = promise[A] // timeout logic Akka.system.scheduler.scheduleOnce(timeout) { prom tryFailure new java.util.concurrent.TimeoutException } // business logic Future { prom success block } prom.future } } 

Using

 val future = TimeoutFuture(10 seconds) { // do stuff here } future onComplete { case Success(stuff) => // use "stuff" case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block) } 

Notes:

  • Assumes a game! (but itโ€™s easy enough to adapt).
  • Each piece of code works in the same ExecutionContext , which may not be ideal.
+23
Jun 25 '13 at 0:15
source share

All of these answers require additional dependencies. I decided to write a version using java.util.Timer, which is an effective way to run a function in the future, in this case, to start a timeout.

Blog post with more details here

Using this with Scala Promise, we can create a future with a wait time as follows:

 package justinhj.concurrency import java.util.concurrent.TimeoutException import java.util.{Timer, TimerTask} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.postfixOps object FutureUtil { // All Future that use futureWithTimeout will use the same Timer object // it is thread safe and scales to thousands of active timers // The true parameter ensures that timeout timers are daemon threads and do not stop // the program from shutting down val timer: Timer = new Timer(true) /** * Returns the result of the provided future within the given time or a timeout exception, whichever is first * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a * Thread.sleep would * @param future Caller passes a future to execute * @param timeout Time before we return a Timeout exception instead of future outcome * @return Future[T] */ def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = { // Promise will be fulfilled with either the callers Future or the timer task if it times out val p = Promise[T] // and a Timer task to handle timing out val timerTask = new TimerTask() { def run() : Unit = { p.tryFailure(new TimeoutException()) } } // Set the timeout to check in the future timer.schedule(timerTask, timeout.toMillis) future.map { a => if(p.trySuccess(a)) { timerTask.cancel() } } .recover { case e: Exception => if(p.tryFailure(e)) { timerTask.cancel() } } p.future } } 
+19
Jul 24 '17 at 4:22
source share

The playback structure contains Promise.timeout, so you can write code as shown below.

 private def get(): Future[Option[Boolean]] = { val timeoutFuture = Promise.timeout(None, Duration("1s")) val mayBeHaveData = Future{ // do something Some(true) } // if timeout occurred then None will be result of method Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture)) } 
+5
Dec 25 '14 at 10:14
source share

You can specify the wait time when you wait in the future:

For scala.concurrent.Future the result method allows you to specify a timeout.

For scala.actors.Future , Futures.awaitAll allows Futures.awaitAll to specify a timeout.

I do not think that there is a timeout built into the fulfillment of the Future.

+3
Apr 30 '13 at 16:23
source share

I am completely surprised that this is not standard in Scala. My versions are short and have no dependencies

 import scala.concurrent.Future sealed class TimeoutException extends RuntimeException object FutureTimeout { import scala.concurrent.ExecutionContext.Implicits.global implicit class FutureTimeoutLike[T](f: Future[T]) { def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future { Thread.sleep(ms) throw new TimeoutException })) lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout } } 

Usage example

 import FutureTimeout._ Future { /* do smth */ } withTimeout 
+3
Apr 10 '15 at 14:50
source share

No one mentioned akka-streams . Streams have a simple completionTimeout method, and applying this to a single-source stream works like the future.

But, akka-streams also cancels, so it can actually finish starting the source, that is, it signals a timeout for the source.

+3
Sep 21 '16 at 13:40
source share

If you want the author (the owner of the promises) to be the one who controls the timeout logic, use akka.pattern.after , as follows:

 val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during..."))) Future.firstCompletedOf(Seq(promiseRef.future, timeout)) 

Thus, if your logic for completing a promise never materializes, your callerโ€™s future will still be terminated at some point with a failure.

+3
Feb 26 '17 at 12:17
source share

Monix Task has support timeout

 import monix.execution.Scheduler.Implicits.global import monix.eval._ import scala.concurrent.duration._ import scala.concurrent.TimeoutException val source = Task("Hello!").delayExecution(10.seconds) // Triggers error if the source does not complete in 3 seconds after runOnComplete val timedOut = source.timeout(3.seconds) timedOut.runOnComplete(r => println(r)) //=> Failure(TimeoutException) 
+1
May 16 '18 at 9:19
source share

I use this version (based on the Play example above) which uses the Akka system manager:

 object TimeoutFuture { def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = { implicit val executionContext = system.dispatcher val prom = Promise[A] // timeout logic system.scheduler.scheduleOnce(timeout) { prom tryFailure new java.util.concurrent.TimeoutException } // business logic Future { try { prom success block } catch { case t: Throwable => prom tryFailure t } } prom.future } } 
0
Jan 19 '18 at 15:03
source share

This version works without using an external timer (just Await.result)

 import scala.concurrent._ import scala.concurrent.duration.FiniteDuration object TimeoutFuture { def apply[A]( timeout: FiniteDuration )(block: => A)(implicit executor: ExecutionContext): Future[A] = try { Future { Await.result(Future { block }, timeout) } } catch { case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}")) } } 
0
Feb 07 '19 at 13:24
source share

The easiest way to specify a timeout in Future IMO is with the built-in scala engine using scala.concurrent.Await.ready This will TimeoutException if Future takes longer than the specified timeout. Otherwise, it will return the Future itself. Here is a simple contrived example

 import scala.concurrent.ExecutionContext.Implicits._ import scala.concurrent.duration._ val f1: Future[Int] = Future { Thread.sleep(1100) 5 } val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds) val f: Future[Int] = Future { Thread.sleep(1100) 5 } val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds) 
0
Apr 25 '19 at 12:30
source share

You can wait until the end using Await .

 import scala.concurrent.duration._ import scala.concurrent.{Await, Future} val meaningOfLife: Int = Await.result(Future(42), 1.nano) println (meaningOfLife) 

The above prints 42

You may need an implicit ExecutionContext , in which case just add:

 import scala.concurrent.ExecutionContext.Implicits.global 



Another way to do this is to use Coeval from monix . This method does not work in all situations, and you can read all about it here . The basic idea is that sometimes the future does not take much time and returns the result of a synchronous function or value call, so this future can be estimated in the current thread. It is also useful for testing and falsifying the future. In addition, you do not need to specify the expected timeout, but still it's nice not to worry about it.

You start by transforming the future into Task , then wrap this task in Coeval , and then cross your fingers, waiting for what you get. This is a very simple example to show how this works:

You need an implicit Scheduler to be able to use it:

 import monix.execution.Scheduler.Implicits.global 


 Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match { case Right(v) => println(v) case Left(task) => println("Task did not finish") } 

The above ends and prints 42 to the console.

 Coeval(Task.fromFuture(Future { scala.concurrent.blocking { 42 } }).runSyncStep).value() match { case Right(v) => println(v) case Left(task) => println("Task did not finish") } 

This example prints Task did not finish :

0
Sep 23 '19 at 2:13
source share



All Articles