从代码中取消 Apache Flink 作业 [英] Canceling Apache Flink job from the code
问题描述
我想从代码中停止/取消 flink 作业.这是在我的集成测试中,我向我的 flink 作业提交任务并检查结果.当作业异步运行时,即使测试失败/通过,它也不会停止.我想在测试结束后停止工作.
I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.
我尝试了一些我在下面列出的东西:
I tried a few things which I am listing below :
- 获取 jobmanager 演员
- 获取正在运行的作业
- 对于每个正在运行的作业,向作业管理器发送取消请求
这当然没有运行,但我不确定 jobmanager actorref 是错误的还是缺少其他东西.
This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.
我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] 消息 [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$]从 Actor[akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未交付.[1] 遇到死信.可以使用配置设置akka.log-dead-letters"和akka.log-dead-letters-during-shutdown"关闭或调整此日志记录
The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'
这意味着要么作业管理器 actor ref 错误,要么发送给它的消息不正确.
which means either the job manager actor ref is wrong or the message sent to it is incorrect.
代码如下所示:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
有人可以检查这是否是正确的方法吗?
Can someone check if this is the correct approach ?
要完全停止Job,需要按照先TaskManager 再JobManager 的顺序停止TaskManager 和JobManager.
EDIT : To completely stop the job, it is necessary to stop the TaskManager along with the JobManager in the order TaskManager first and then JobManager.
推荐答案
您正在创建一个新的 ActorSystem
,然后尝试查找名为 /user/jobmanager_1代码>在同一个actor系统中.这是行不通的,因为实际的作业管理器将在不同的
ActorSystem
中运行.
You're creating a new ActorSystem
and then try to find an actor with the name /user/jobmanager_1
in the same actor system. This won't work, since the actual job manager will run in a different ActorSystem
.
如果你想获得一个 ActorRef
给真正的作业管理器,你要么必须使用相同的 ActorSystem
进行选择(然后你可以使用本地地址)或者您已经找到了作业管理器参与者的远程地址.远程地址的格式为 akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]
.如果您有权访问FlinkMiniCluster
,那么您可以使用leaderGateway
承诺获取当前领导者的ActorGateway
.
If you want to obtain an ActorRef
to the real job manager, you either have to use the same ActorSystem
for the selection (then you can use a local address) or you have find out the remote address for the job manager actor. The remote address has the format akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]
. If you have access to the FlinkMiniCluster
then you can use the leaderGateway
promise to obtain the current leader's ActorGateway
.
这篇关于从代码中取消 Apache Flink 作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!