Akka: testing control \ death watch

In my scenario, I have 2 actors:

  • watchee (I use TestProbe )
  • watcher ( watcher wrapped in TestActorRef to output several internal state I tracks to my test)

The supervisor must take some action when the watchee dies.

Here is the complete test case I have written so far:

 class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll { def this() = this(ActorSystem("TempTest")) override def afterAll { TestKit.shutdownActorSystem(system) } class WatcherActor(watchee: ActorRef) extends Actor { var state = "initial" context.watch(watchee) override def receive: Receive = { case "start" => state = "start" case _: Terminated => state = "terminated" } } test("example") { val watchee = TestProbe() val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref))) assert(watcher.underlyingActor.state === "initial") watcher ! "start" // "start" will be sent and handled by watcher synchronously assert(watcher.underlyingActor.state === "start") system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher Thread.sleep(100) // what is the best way to avoid blocking here? assert(watcher.underlyingActor.state === "terminated") } } 

Now, since all involved participants use CallingThreadDispatcher (all Akka test assistants are created using props with .withDispatcher(CallingThreadDispatcher.Id) ), I can safely assume that when this statement returns:

 watcher ! "start" 

... the start message has already been processed by WatchingActor , and therefore I can make statements based on watcher.underlyingActor.state

However, based on my observations, when I stop watchee using system.stop or send Kill Terminated message to it, created as a side effect of watchee , death is executed asynchronously, in a different thread.

A non-solution is to stop watchee , block the thread for some time, and check the watcher status after that, but I would like to know how to do it correctly (that is, how to be sure that after the actor’s murder, the observer received and processed a Terminated message indicating his death)?

+5
source share
2 answers

EDIT: After discussing and testing with the OP, we found that sending PoisonPill as a means of terminating the observed actor achieves the desired behavior, since the PPill interrupt is processed synchronously while the interrupt or kill is processed asynchronously.

While we are unsure of the reason, our best bet is that killing the actor raises an exception, and PPilling does not.

Apparently, this has nothing to do with using the gracefulStop template in accordance with my initial suggestion, which I will discuss below.

Thus, the solution to the OP problem was only to stop the watched actor sending the PPill by instead sending a Kill message or running the system.stop command.


The old answer starts here:

Maybe I'm going to suggest something a little unrelated, but I think this can be applied.

If I understand correctly what you want to do, it basically interrupts the actor synchronously, i.e. do what comes back only after the actor officially died and his death was recorded (in your case, an observer).

In general, death notification, as well as most of the others in akka, is asynchronous. However, you can obtain a synchronous death confirmation using the gracefulStop template (akka.pattern.gracefulStop).

For this, the code should look something like this:

 val timeout = 5.seconds val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill) Await.result(killResultFuture, timeout) 

What this does is send a PoisonPill to the victim (note: you can use a special message) that will respond to the future, which will be completed after the death of the victim. Using Await.result, you are guaranteed to be in sync.

Unfortunately, this can only be used if: a) you are actively killing the victim, and instead you do not want to respond to an external cause of death. b) You can use timeouts and locks in your code. But perhaps you can adapt this template to your situation.

+3
source

One way to fix this problem is to introduce another observer in your test, who also watches watchee . This other observer is TestProbe , which will allow us to execute a statement on it that will get rid of the time problems that you see. First, a modified test code:

  val watchee = TestProbe() val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref))) val probeWatcher = TestProbe() probeWatcher watch watchee.ref assert(watcher.underlyingActor.state === "initial") watcher ! "start" // "start" will be sent and handled by watcher synchronously assert(watcher.underlyingActor.state === "start") system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher probeWatcher.expectTerminated(watchee.ref) assert(watcher.underlyingActor.state === "terminated") 

So you can see that I introduced an extra observer with the lines:

 val probeWatcher = TestProbe() probeWatcher watch watchee.ref 

Then, later in the code, before the final statement that fails for you, I use another statement that lets me know that the Terminated message for the stopped member was correctly distributed:

probeWatcher.expectTerminated (watchee.ref)

When the code moves past this line, I can be sure that the watcher test tag also received its completed message, and the subsequent statement will pass.

EDIT

As noted by the OP, the level of this is independent of determinism. Another possible solution is to change the line in the test code that stops the actor:

 watcher.underlyingActor.context.stop(watchee.ref) 

Using context TestActorRef , I believe that Terminated will be delivered to everyone through CallingThreadDispatcher and thus will be completely synchronous. I checked this in a loop and it worked for me over 1000 iterations.

Now I thought that, perhaps because I was stop using the same actor that Terminated expecting, there might have been an optimization for delivering Terminated to self for this scanario, so I also tested this with a completely different Actor as follows:

 class FooActor extends Actor{ def receive = { case _ => } 

Then in the test code:

 val foo = TestActorRef(new FooActor) 

And when stopping:

 foo.underlyingActor.context.stop(watchee.ref) 

It also worked as expected.

+5
source

Source: https://habr.com/ru/post/1204947/


All Articles