Despite the fact that you already have an excellent answer, I thought that I could still express an opinion on this.
I remember somewhere (on someone's blog) "used actors to use and use futures for concurrency".
So my first thought was to somehow use the actors. To be precise, I would have a master actor with a router launching several active participants, with the number of employees restrained according to allowableParallelism . So, if I have
def doWorkInternal (symbol: String): Unit
which does the work from your doWork taken โoutside the futureโ, I would have something in this direction (very rudimentary, not taking into account many details and practically copying the code from the akka documentation):
import akka.actor._ case class WorkItem (symbol: String) case class WorkItemCompleted (symbol: String) case class WorkLoad (symbols: Array[String]) case class WorkLoadCompleted () class Worker extends Actor { def receive = { case WorkItem (symbol) => doWorkInternal (symbol) sender () ! WorkItemCompleted (symbol) } } class Master extends Actor { var pending = Set[String] () var originator: Option[ActorRef] = None var router = { val routees = Vector.fill (allowableParallelism) { val r = context.actorOf(Props[Worker]) context watch r ActorRefRoutee(r) } Router (RoundRobinRoutingLogic(), routees) } def receive = { case WorkLoad (symbols) => originator = Some (sender ()) context become processing for (symbol <- symbols) { router.route (WorkItem (symbol), self) pending += symbol } } def processing: Receive = { case Terminated (a) => router = router.removeRoutee(a) val r = context.actorOf(Props[Worker]) context watch r router = router.addRoutee(r) case WorkItemCompleted (symbol) => pending -= symbol if (pending.size == 0) { context become receive originator.get ! WorkLoadCompleted } } }
You can request a master actor with ask and get WorkLoadCompleted in the future.
But, thinking more about the "state" (the number of simultaneous requests in processing), somewhere to hide, along with the implementation of the necessary code that does not exceed it, here is something like an "intermediary of an intermediate gateway", if you are not a style of the mental mind and mutable (used only internally) structures:
object Guardian { private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]() private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]() private val pending = new collection.mutable.Queue[String] def doWorkGuarded (symbol: String): Future[Unit] = { synchronized { val p = Promise[Unit] () incoming(symbol) = p if (incoming.size <= allowableParallelism) launchWork (symbol) else pending.enqueue (symbol) p.future } } private def completionHandler (t: Try[Unit]): Unit = { synchronized { for (symbol <- outgoing.keySet) { val f = outgoing (symbol) if (f.isCompleted) { incoming (symbol).completeWith (f) incoming.remove (symbol) outgoing.remove (symbol) } } for (i <- outgoing.size to allowableParallelism) { if (pending.nonEmpty) { val symbol = pending.dequeue() launchWork (symbol) } } } } private def launchWork (symbol: String): Unit = { val f = doWork(symbol) outgoing(symbol) = f f.onComplete(completionHandler) } }
doWork now exactly matches yours, returning Future[Unit] , with the idea that instead of using something like
val futures = symbols.map (doWork (_)).toSeq val future = Future.sequence(futures)
which generally launched futures not related to allowableParallelism , instead I would use
val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq val future = Future.sequence(futures)
Think of some hypothetical non-blocking database access driver, i.e. returning futures for requests that are limited in concurrency, being built, for example, on some connection pool - you do not want it to return futures that do not accept parallelism, and you need to manipulate them to keep parallelism under control.
This example is more visual than practical, since I would usually not expect the outgoing interface to use such futures (this is the quotation ok for the inbound interface).