Akka Akka and clustering - I have problems with ClusterSingletonManager - an unhandled event in the Start state

I have a system that uses Akka 2.2.4, which creates a bunch of local participants and sets them up as Broadcast Router routes. Each worker processes a certain segment of the overall work, in accordance with a certain range of hashes that we transmit. It works great.

Now I have to group this application to switch to another resource. Based on the requirement that only one working hash range exist or start on the cluster, it seems to me that setting each of them as ClusterSingletonManager will make sense ... I have problems with its operation. The actor system starts up, it creates the ClusterSingletonManager, it adds the path to the Broadcast Router code in the code below, but it never allows my actual working actor to process my messages for some reason. All I get is a log message: "unhandled event $ {my message} in Start state". What am I doing wrong? Is there anything else I need to do to start this single instance cluster? Can I send a message to the wrong actor?

here is my akka configuration (I use the default configuration as a backup):

akka{
    cluster{
        roles=["workerSystem"]
        min-nr-of-members = 1
        role {
        workerSystem.min-nr-of-members = 1
}
    }
    daemonic = true
    remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = ${akkaPort}
        }
    }
    actor{
        provider = akka.cluster.ClusterActorRefProvider
        single-message-bound-mailbox {
              # FQCN of the MailboxType. The Class of the FQCN must have a public
              # constructor with
              # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
              mailbox-type = "akka.dispatch.BoundedMailbox"

              # If the mailbox is bounded then it uses this setting to determine its
              # capacity. The provided value must be positive.
              # NOTICE:
              # Up to version 2.1 the mailbox type was determined based on this setting;
              # this is no longer the case, the type must explicitly be a bounded mailbox.
              mailbox-capacity = 1

              # If the mailbox is bounded then this is the timeout for enqueueing
              # in case the mailbox is full. Negative values signify infinite
              # timeout, which should be avoided as it bears the risk of dead-lock.
              mailbox-push-timeout-time = 1

        }
        worker-dispatcher{
         type = PinnedDispatcher
         executor = "thread-pool-executor"
          # Throughput defines the number of messages that are processed in a batch
          # before the thread is returned to the pool. Set to 1 for as fair as possible.
         throughput = 500
         thread-pool-executor {
            # Keep alive time for threads
            keep-alive-time = 60s

            # Min number of threads to cap factor-based core number to
            core-pool-size-min = ${workerCount}

            # The core pool size factor is used to determine thread pool core size
            # using the following formula: ceil(available processors * factor).
            # Resulting size is then bounded by the core-pool-size-min and
            # core-pool-size-max values.
            core-pool-size-factor = 3.0

            # Max number of threads to cap factor-based number to
            core-pool-size-max = 64

            # Minimum number of threads to cap factor-based max number to
            # (if using a bounded task queue)
            max-pool-size-min = ${workerCount}

            # Max no of threads (if using a bounded task queue) is determined by
            # calculating: ceil(available processors * factor)
            max-pool-size-factor  = 3.0

            # Max number of threads to cap factor-based max number to
            # (if using a  bounded task queue)
            max-pool-size-max = 64

            # Specifies the bounded capacity of the task queue (< 1 == unbounded)
            task-queue-size = -1

            # Specifies which type of task queue will be used, can be "array" or
            # "linked" (default)
            task-queue-type = "linked"

            # Allow core threads to time out
            allow-core-timeout = on
          }
         fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 1

            # The parallelism factor is used to determine thread pool size using the
            # following formula: ceil(available processors * factor). Resulting size
            # is then bounded by the parallelism-min and parallelism-max values.
            parallelism-factor = 3.0

            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 1
          }
        }
    }
}

Here, where I create my Actors (its "written in Groovy):

            Props clusteredProps = ClusterSingletonManager.defaultProps("worker".toString(), PoisonPill.getInstance(), "workerSystem",
                    new ClusterSingletonPropsFactory(){

                        @Override
                        Props create(Object handOverData) {
                            log.info("called in ClusterSingetonManager")
                            Props.create(WorkerActorCreator.create(applicationContext, it.start, it.end)).withDispatcher("akka.actor.worker-dispatcher").withMailbox("akka.actor.single-message-bound-mailbox")
                        }
                    } )
            ActorRef manager = system.actorOf(clusteredProps, "worker-${it.start}-${it.end}".toString())
            String path = manager.path().child("worker").toString()
            path

when I try to send a message to the actual working actor, should the path above decide? This is currently not the case. What am I doing wrong? In addition, these actors live in the Spring application, and the working members are configured with some @Autowired dependencies. Although this Spring integration worked well in a non-clustered environment, are there any issues in the cluster environment I should be looking for?

Thank you

FYI: I also posted this in akka google user group. Here is the link.

+4
source share
1 answer

- ClusterSingletonManager, node " ". (WorkerActor) "worker - ${it.start} - ${it.end}" node , .. Singleton .

ClusterSingletonManager, . system.actorOf(clusteredProps, "workerSingletonManager").

ClusterSingletonManager. , node. ConsumerProxy .

, . node, . akka-user google.

+3

Source: https://habr.com/ru/post/1532750/


All Articles