Is it possible to use actor’s blocking messages when they are wrapped in the future?

My current application is based on akka 1.1. He has several ProjectAnalysisActors , each of whom is responsible for processing analysis tasks for a specific project. The analysis begins when such an actor receives a general start message. Upon completion of one step, he sends himself a message with the next step, as soon as it is defined. The executing code is as follows

 sealed trait AnalysisEvent { def run(project: Project): Future[Any] def nextStep: AnalysisEvent = null } case class StartAnalysis() extends AnalysisEvent { override def run ... override def nextStep: AnalysisEvent = new FirstStep } case class FirstStep() extends AnalysisEvent { override def run ... override def nextStep: AnalysisEvent = new SecondStep } case class SecondStep() extends AnalysisEvent { ... } class ProjectAnalysisActor(project: Project) extends Actor { def receive = { case event: AnalysisEvent => val future = event.run(project) future.onComplete { f => self ! event.nextStep } } } 

I have some difficulties with how to implement my code for run methods for each step of the analysis. At the moment, I am creating a new future in every run method. In this future, I send all subsequent messages to different subsystems. Some of them are non-blocking fire-and-forget messages, but some of them return a result that must be saved before the start of the next stage of analysis.

Currently, a typical startup method is as follows

 def run(project: Project): Future[Any] = { Future { progressActor ! typicalFireAndForget(project.name) val calcResult = (calcActor1 !! doCalcMessage(project)).getOrElse(...) val p: Project = ... // created updated project using calcResult val result = (storage !! updateProjectInformation(p)).getOrElse(...) result } } 

Since these blocking messages should be avoided, I am wondering if this is correct. Does it make sense to use them in this use case or am I still to avoid this? If so, would that be the right decision?

+6
source share
1 answer

Apparently, the only goal of ProjectAnalysisActor is to bind future calls. Secondly, startup methods also wait for results to continue computing.

So, I think you can try reorganizing your code to use Future Composition, as described here: http://akka.io/docs/akka/1.1/scala/futures.html

 def run(project: Project): Future[Any] = { progressActor ! typicalFireAndForget(project.name) for( calcResult <- calcActor1 !!! doCalcMessage(project); p = ... // created updated project using calcResult result <- storage !!! updateProjectInformation(p) ) yield ( result ) } 
+7
source

Source: https://habr.com/ru/post/892677/


All Articles