How to create a block execution context?

For some reason, I cannot fool this idea. I have an application that works with Play that invokes Elastic Search . As part of my project, my service uses the Java API wrapped in the future of scala, as shown in this blog . I updated the code from this post to hint at ExecutionContext that it will do blocking I / O like this:

    import scala.concurent.{blocking, Future, Promise}
    import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
    def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
        blocking {
            request.execute(this)
            promise.future
        }
    }

My actual service, which builds requests for sending to ES, accepts execContext as a constructor parameter, which it then uses for calls to search for elasticity. I made it so that the global execution context that uses uses will not have threads associated with blocking calls in ES. This SO comment mentions that only the global context blocks information, so I have to create my own. There is a lot of information in the same post / answer about using the ForkJoin pool, but I'm not sure how to take what is written in those documents and combine it with the hints in the locking documentation to create an execution context that responds to the blocking hints.

, , , , , ? , , , - :

, " ", . - , , . , , , , .

, , ForkJoin, - -? - ? , , , , Neophytes , :

.

, , , , ExecutionContext, . , / blocking ?

, , :

, , , .


: , , ElasticSearch:

//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)

- :

object Global extends GlobalSettings {
    val elasticSearchContext = //Custom Context goes here
    ...
    val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
    ...
}

, , , , , , , BlockContext.

+4
1

, , , -

. IO, . , Scala:

:

object Contexts {
    implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")
}

Akka, , , , . ForkJoinPool, - managedBlock(blocker). , :

. ForkJoinWorkerThread, , , , , parallelism, .

, , ForkJoinWorkerThread, , , , , . ForkJoinPool, , factory:

val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory

, Akka, , .

, Akka, , :

my-context {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 8
    parallelism-factor = 3.0
    parallelism-max = 64
    task-peeking-mode = "FIFO"
  }
  throughput = 100
}

- , .

, blocking managedBlock ForkJoin ThreadPoolBuilder

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
    override def blockOn[T](thunk: โ‡’ T)(implicit permission: CanAwait): T = {
      val result = new AtomicReference[Option[T]](None)
      ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
        def block(): Boolean = {
          result.set(Some(thunk))
          true
        }
        def isReleasable = result.get.isDefined
      })
      result.get.get // Exception intended if None
    }
  }

, , -, BlockContext. , , ExecutorServiceFactory, executor . , , WorkerThread ExecutorServiceFactory, workthread, , .

, , Akka forkjoin:)

0

Source: https://habr.com/ru/post/1626384/


All Articles