从代码中取消 Apache Flink 作业 [英] Canceling Apache Flink job from the code

查看:30
本文介绍了从代码中取消 Apache Flink 作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从代码中停止/取消 flink 作业.这是在我的集成测试中,我向我的 flink 作业提交任务并检查结果.当作业异步运行时,即使测试失败/通过,它也不会停止.我想在测试结束后停止工作.

I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.

我尝试了一些我在下面列出的东西:

I tried a few things which I am listing below :

  1. 获取 jobmanager 演员
  2. 获取正在运行的作业
  3. 对于每个正在运行的作业,向作业管理器发送取消请求

这当然没有运行,但我不确定 jobmanager actorref 是错误的还是缺少其他东西.

This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.

我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] 消息 [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$]从 Actor[akka://flink/temp/$a] 到 Actor[akka://flink/user/jobmanager_1] 未交付.[1] 遇到死信.可以使用配置设置akka.log-dead-letters"和akka.log-dead-letters-during-shutdown"关闭或调整此日志记录

The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

这意味着要么作业管理器 actor ref 错误,要么发送给它的消息不正确.

which means either the job manager actor ref is wrong or the message sent to it is incorrect.

代码如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

有人可以检查这是否是正确的方法吗?

Can someone check if this is the correct approach ?

要完全停止Job,需要按照先TaskManager 再JobManager 的顺序停止TaskManager 和JobManager.

EDIT : To completely stop the job, it is necessary to stop the TaskManager along with the JobManager in the order TaskManager first and then JobManager.

推荐答案

您正在创建一个新的 ActorSystem,然后尝试查找名为 /user/jobmanager_1在同一个actor系统中.这是行不通的,因为实际的作业管理器将在不同的 ActorSystem 中运行.

You're creating a new ActorSystem and then try to find an actor with the name /user/jobmanager_1 in the same actor system. This won't work, since the actual job manager will run in a different ActorSystem.

如果你想获得一个 ActorRef 给真正的作业管理器,你要么必须使用相同的 ActorSystem 进行选择(然后你​​可以使用本地地址)或者您已经找到了作业管理器参与者的远程地址.远程地址的格式为 akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number].如果您有权访问FlinkMiniCluster,那么您可以使用leaderGateway 承诺获取当前领导者的ActorGateway.

If you want to obtain an ActorRef to the real job manager, you either have to use the same ActorSystem for the selection (then you can use a local address) or you have find out the remote address for the job manager actor. The remote address has the format akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]. If you have access to the FlinkMiniCluster then you can use the leaderGateway promise to obtain the current leader's ActorGateway.

这篇关于从代码中取消 Apache Flink 作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆