I am in a situation where I want to stop / cancel a flink from a code. This is in my integration test, where I submit the task to work with flink and check the result. As the task runs, asynchronously, it does not stop even when the test fails / passes. I want to stop after the test completes.
I tried a few things that are listed below:
- Get the role of director and manager.
- Get work assignments
- For each job in progress, send a cancellation request to the job manager
This, of course, does not work, but I'm not sure if the jobmanager statement is incorrect or something else is missing.
The error I get: [flink-akka.actor.default-dispatcher-5] [akka: // flink / user / jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $] from the Actor [ akka: // flink / temp / $ a] to the Actor [akka: // flink / user / jobmanager_1] not delivered. [1] Dead letters found. This log can be disabled or configured with the configuration settings "akka.log-dead-letters" and "akka.log-dead-letters-while-shutdown"
which means that the referent operator of the job manager is incorrect or the message sent to him is incorrect.
The code is as follows:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka"))
val jobManager = system.actorSelection("/user/jobmanager_1")
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
Can anyone check if this is correct?
EDIT: To completely stop a job, you must stop the TaskManager together with the JobManager in the order of the TaskManager and then the JobManager.