Sequence Scala Futures with limited parallelism (no interaction with ExecutorContexts)

Background: I have a function:

def doWork(symbol: String): Future[Unit] 

which triggers some side effects for data retrieval and storage and completes the future when this is done. However, the internal infrastructure has usage restrictions, so no more than 5 of these requests can be made in parallel. I have a list of N characters that I need to go through:

  var symbols = Array("MSFT",...) 

but I want to arrange them so that no more than 5 are executed simultaneously. Given:

  val allowableParallelism = 5 

my current solution (assuming I'm working with async / wait):

  val symbolChunks = symbols.toList.grouped(allowableParallelism).toList def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork)) val symbolThunks = symbolChunks.map(toThunk) val done = Promise[Unit]() def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match { case Nil => done.success() case x::xs => x().onComplete(_ => procThunks(xs)) } procThunks(symbolThunks) await { done.future } 

but for obvious reasons I'm not very happy with this. I feel this should be possible with folds, but every time I try, I end up looking forward to futures. I also tried the version using RxScala Observables using concatMap, but that also seemed redundant.

Is there a better way to do this?

+5
source share
3 answers

I have an example of how to do this using a scalaz thread. This is quite a bit of code because you need to convert scala Future to scalaz Task (an abstraction for deferred computing). However, he had to add it to the project once. Another option is to use the task to define "doWork". I personally prefer the task of creating asynchronous programs.

  import scala.concurrent.{Future => SFuture} import scala.util.Random import scala.concurrent.ExecutionContext.Implicits.global import scalaz.stream._ import scalaz.concurrent._ val P = scalaz.stream.Process val rnd = new Random() def doWork(symbol: String): SFuture[Unit] = SFuture { Thread.sleep(rnd.nextInt(1000)) println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}") } val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX"). flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}")) implicit class Transformer[+T](fut: => SFuture[T]) { def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { import scala.util.{Failure, Success} import scalaz.syntax.either._ Task.async { register => fut.onComplete { case Success(v) => register(v.right) case Failure(ex) => register(ex.left) } } } } implicit class ConcurrentProcess[O](val process: Process[Task, O]) { def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = { val actions = process. zipWith(f)((data, f) => f(data)) val nestedActions = actions.map(P.eval) merge.mergeN(concurrencyLevel)(nestedActions) } } val workChannel = io.channel((s: String) => doWork(s).toTask) val process = Process.emitAll(symbols).concurrently(5)(workChannel) process.run.run 

When you have all this conversion in scope, basically all you need is:

  val workChannel = io.channel((s: String) => doWork(s).toTask) val process = Process.emitAll(symbols).concurrently(5)(workChannel) 

Pretty short and self-recording

+5
source

Despite the fact that you already have an excellent answer, I thought that I could still express an opinion on this.

I remember somewhere (on someone's blog) "used actors to use and use futures for concurrency".

So my first thought was to somehow use the actors. To be precise, I would have a master actor with a router launching several active participants, with the number of employees restrained according to allowableParallelism . So, if I have

 def doWorkInternal (symbol: String): Unit 

which does the work from your doWork taken โ€œoutside the futureโ€, I would have something in this direction (very rudimentary, not taking into account many details and practically copying the code from the akka documentation):

 import akka.actor._ case class WorkItem (symbol: String) case class WorkItemCompleted (symbol: String) case class WorkLoad (symbols: Array[String]) case class WorkLoadCompleted () class Worker extends Actor { def receive = { case WorkItem (symbol) => doWorkInternal (symbol) sender () ! WorkItemCompleted (symbol) } } class Master extends Actor { var pending = Set[String] () var originator: Option[ActorRef] = None var router = { val routees = Vector.fill (allowableParallelism) { val r = context.actorOf(Props[Worker]) context watch r ActorRefRoutee(r) } Router (RoundRobinRoutingLogic(), routees) } def receive = { case WorkLoad (symbols) => originator = Some (sender ()) context become processing for (symbol <- symbols) { router.route (WorkItem (symbol), self) pending += symbol } } def processing: Receive = { case Terminated (a) => router = router.removeRoutee(a) val r = context.actorOf(Props[Worker]) context watch r router = router.addRoutee(r) case WorkItemCompleted (symbol) => pending -= symbol if (pending.size == 0) { context become receive originator.get ! WorkLoadCompleted } } } 

You can request a master actor with ask and get WorkLoadCompleted in the future.

But, thinking more about the "state" (the number of simultaneous requests in processing), somewhere to hide, along with the implementation of the necessary code that does not exceed it, here is something like an "intermediary of an intermediate gateway", if you are not a style of the mental mind and mutable (used only internally) structures:

 object Guardian { private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]() private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]() private val pending = new collection.mutable.Queue[String] def doWorkGuarded (symbol: String): Future[Unit] = { synchronized { val p = Promise[Unit] () incoming(symbol) = p if (incoming.size <= allowableParallelism) launchWork (symbol) else pending.enqueue (symbol) p.future } } private def completionHandler (t: Try[Unit]): Unit = { synchronized { for (symbol <- outgoing.keySet) { val f = outgoing (symbol) if (f.isCompleted) { incoming (symbol).completeWith (f) incoming.remove (symbol) outgoing.remove (symbol) } } for (i <- outgoing.size to allowableParallelism) { if (pending.nonEmpty) { val symbol = pending.dequeue() launchWork (symbol) } } } } private def launchWork (symbol: String): Unit = { val f = doWork(symbol) outgoing(symbol) = f f.onComplete(completionHandler) } } 

doWork now exactly matches yours, returning Future[Unit] , with the idea that instead of using something like

 val futures = symbols.map (doWork (_)).toSeq val future = Future.sequence(futures) 

which generally launched futures not related to allowableParallelism , instead I would use

 val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq val future = Future.sequence(futures) 

Think of some hypothetical non-blocking database access driver, i.e. returning futures for requests that are limited in concurrency, being built, for example, on some connection pool - you do not want it to return futures that do not accept parallelism, and you need to manipulate them to keep parallelism under control.

This example is more visual than practical, since I would usually not expect the outgoing interface to use such futures (this is the quotation ok for the inbound interface).

+3
source

Firstly, it is obvious that a certain purely functional shell is required around Scala Future , which makes it efficient in terms of efficiency and starts as soon as possible. Let's call it Deferred :

 import scala.concurrent.Future import scala.util.control.Exception.nonFatalCatch class Deferred[+T](f: () => Future[T]) { def run(): Future[T] = f() } object Deferred { def apply[T](future: => Future[T]): Deferred[T] = new Deferred(() => nonFatalCatch.either(future).fold(Future.failed, identity)) } 

And here is the routine:

 import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable.Seq import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.control.Exception.nonFatalCatch import scala.util.{Failure, Success} trait ConcurrencyUtils { def runWithBoundedParallelism[T](parallelism: Int = Runtime.getRuntime.availableProcessors()) (operations: Seq[Deferred[T]]) (implicit ec: ExecutionContext): Deferred[Seq[T]] = if (parallelism > 0) Deferred { val indexedOps = operations.toIndexedSeq // index for faster access val promise = Promise[Seq[T]]() val acc = new CopyOnWriteArrayList[(Int, T)] // concurrent acc val nextIndex = new AtomicInteger(parallelism) // keep track of the next index atomically def run(operation: Deferred[T], index: Int): Unit = { operation.run().onComplete { case Success(value) => acc.add((index, value)) // accumulate result value if (acc.size == indexedOps.size) { // we've done import scala.collection.JavaConversions._ // in concurrent setting next line may be called multiple times, that why trySuccess instead of success promise.trySuccess(acc.view.sortBy(_._1).map(_._2).toList) } else { val next = nextIndex.getAndIncrement() // get and inc atomically if (next < indexedOps.size) { // run next operation if exists run(indexedOps(next), next) } } case Failure(t) => promise.tryFailure(t) // same here (may be called multiple times, let prevent stdout pollution) } } if (operations.nonEmpty) { indexedOps.view.take(parallelism).zipWithIndex.foreach((run _).tupled) // run as much as allowed promise.future } else { Future.successful(Seq.empty) } } else { throw new IllegalArgumentException("Parallelism must be positive") } } 

In short, we run as many operations that were initially allowed, and then each time the operation completes, we start the next operation, if any. Thus, the only difficulty here is maintaining the next operational index and results store in parallel tuning. I'm not an absolute concurrency expert, so let me know if there are any potential problems in the code above. Note that the return value is also a deferred calculation, which should be run .

Use and testing:

 import org.scalatest.{Matchers, FlatSpec} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Seconds, Span} import scala.collection.immutable.Seq import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration._ class ConcurrencyUtilsSpec extends FlatSpec with Matchers with ScalaFutures with ConcurrencyUtils { "runWithBoundedParallelism" should "return results in correct order" in { val comp1 = mkDeferredComputation(1) val comp2 = mkDeferredComputation(2) val comp3 = mkDeferredComputation(3) val comp4 = mkDeferredComputation(4) val comp5 = mkDeferredComputation(5) val compountComp = runWithBoundedParallelism(2)(Seq(comp1, comp2, comp3, comp4, comp5)) whenReady(compountComp.run()) { result => result should be (Seq(1, 2, 3, 4, 5)) } } // increase default ScalaTest patience implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds)) private def mkDeferredComputation[T](result: T, sleepDuration: FiniteDuration = 100.millis): Deferred[T] = Deferred { Future { Thread.sleep(sleepDuration.toMillis) result } } } 
+1
source

Source: https://habr.com/ru/post/1207499/


All Articles