Akka TypedActor - how to handle asynchronous responses with context correctly

I started working with TypedActors in Scala and ran into a problem doing something pretty simple: I want Actor A to call a method on Actor B and process the result in an anonymous function on Actor A, but ensuring that

  • My response processing function is thread safe, for example. will not start simultaneously with any other threads accessing the Actor as a state
  • My response function can reference the context of Actor A

How can I (or can I) satisfy both of these requirements?

For example, this actor simply wants to call the API on otherActor, which returns Future [Int], and updates its state with the result, and then does something that requires the context of the actor:

class MyActorImpl extends MyActor {

  // my mutable state
  var myNumber = 0

  // method proxied by TypedActor ref:
  def doStuff(otherActor: OtherActor): Unit = {
    otherActor.doOtherStuff onSuccess {
      // oops this is no longer running in MyActorImpl..
      // this could be on a concurrent thread if we
      case i => processResult(i)
    }
  }

  private def processResult(i: Int): Unit = {
    myNumber = 0 // oops, now we are possibly making a concurrent modification
    println(s"Got $i")

    // fails with java.lang.IllegalStateException: Calling TypedActor.context
    // outside of a TypedActor implementation method!
    println(s"My context is ${TypedActor.context}")
  }
}

? , -, ? , "private" (, processResult) .

, Scala REPL:

import akka.actor._
import scala.concurrent._

val system = ActorSystem("mySystem")
import system.dispatcher

trait OtherActor {
  def doOtherStuff(): Future[Int]
}


trait MyActor {
  def doStuff(otherActor: OtherActor): Unit
}

class OtherActorImpl extends OtherActor {
  var i = 0
  def doOtherStuff(): Future[Int] = {
    i += 1
    Future {i} 
  }
}

class MyActorImpl extends MyActor {

  // my mutable state
  var myNumber = 0

  // method proxied by TypedActor ref:
  def doStuff(otherActor: OtherActor): Unit = {
    otherActor.doOtherStuff onSuccess {
      // oops this is no longer running in MyActorImpl..
      // this could be on a concurrent thread if we
      case i => processResult(i)
    }
  }

  private def processResult(i: Int): Unit = {
    myNumber = 0 // oops, now we are possibly making a concurrent modification
    println(s"Got $i")

    // fails with java.lang.IllegalStateException: Calling TypedActor.context
    // outside of a TypedActor implementation method!
    println(s"My context is ${TypedActor.context}")
  }
}

val actor1: MyActor = TypedActor(system).typedActorOf(TypedProps[MyActorImpl])
val actor2: OtherActor = TypedActor(system).typedActorOf(TypedProps[OtherActorImpl])

actor1.doStuff(actor2)
+4

Source: https://habr.com/ru/post/1538161/


All Articles