Cancel Apache Flink Job from Code

I am in a situation where I want to stop / cancel a flink from a code. This is in my integration test, where I submit the task to work with flink and check the result. As the task runs, asynchronously, it does not stop even when the test fails / passes. I want to stop after the test completes.

I tried a few things that are listed below:

  • Get the role of director and manager.
  • Get work assignments
  • For each job in progress, send a cancellation request to the job manager

This, of course, does not work, but I'm not sure if the jobmanager statement is incorrect or something else is missing.

The error I get: [flink-akka.actor.default-dispatcher-5] [akka: // flink / user / jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $] from the Actor [ akka: // flink / temp / $ a] to the Actor [akka: // flink / user / jobmanager_1] not delivered. [1] Dead letters found. This log can be disabled or configured with the configuration settings "akka.log-dead-letters" and "akka.log-dead-letters-while-shutdown"

which means that the referent operator of the job manager is incorrect or the message sent to him is incorrect.

The code is as follows:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

Can anyone check if this is correct?

EDIT: To completely stop a job, you must stop the TaskManager together with the JobManager in the order of the TaskManager and then the JobManager.

+4
1

ActorSystem, /user/jobmanager_1 . , ActorSystem.

ActorRef , ActorSystem ( ), . akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]. FlinkMiniCluster, leaderGateway ActorGateway.

+3

Source: https://habr.com/ru/post/1669279/


All Articles