从代码中取消Apache Flink作业 [英] Canceling Apache Flink job from the code

查看:249
本文介绍了从代码中取消Apache Flink作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我处于一种要停止/取消代码中的flink作业的情况。这是在集成测试中,在该测试中,我正在向flink作业提交任务并检查结果。随着工作的进行,异步地,即使测试失败/通过,它也不会停止。我想在测试结束后停下来。

I am in a situation where I want to stop/cancel the flink job from the code. This is in my integration test where I am submitting a task to my flink job and check the result. As the job runs, asynchronously, it doesn't stop even when the test fails/passes. I want to job the stop after the test is over.

我尝试了一些我在下面列出的东西:

I tried a few things which I am listing below :


  1. 获取职位经理演员

  2. 获取正在运行的职位

  3. 对于每个正在运行的职位,向其发送取消请求jobmanager

这当然不是在运行,但是我不确定jobmanager actorref是错误的还是缺少其他东西。

This, of course in not running but I am not sure whether the jobmanager actorref is wrong or something else is missing.

我得到的错误是:[flink-akka.actor.default-dispatcher-5] [akka:// flink / user / jobmanager_1]消息[org.apache从Actor [akka:// flink / temp / $ a]到Actor [akka:// flink / user / jobmanager_1]的.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $]未交付。 [1]遇到死信。可以使用配置设置 akka.log-dead-letters和 akka.log-dead-letters-during-shutdown关闭或调整该日志记录

The error I get is : [flink-akka.actor.default-dispatcher-5] [akka://flink/user/jobmanager_1] Message [org.apache.flink.runtime.messages.JobManagerMessages$RequestRunningJobsStatus$] from Actor[akka://flink/temp/$a] to Actor[akka://flink/user/jobmanager_1] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

其中

该代码如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

有人可以检查是否是正确的方法吗?

Can someone check if this is the correct approach ?

编辑:
要完全停止作业,必须先按TaskManager的顺序停止TaskManager和JobManager,然后再停止JobManager。

EDIT : To completely stop the job, it is necessary to stop the TaskManager along with the JobManager in the order TaskManager first and then JobManager.

推荐答案

您要创建一个新的 ActorSystem ,然后尝试在同一个actor系统中查找名称为 / user / jobmanager_1 的actor。这将无法正常工作,因为实际的工作经理将在不同的 ActorSystem 中运行。

You're creating a new ActorSystem and then try to find an actor with the name /user/jobmanager_1 in the same actor system. This won't work, since the actual job manager will run in a different ActorSystem.

如果获取真实工作经理的 ActorRef ,您必须使用相同的 ActorSystem 进行选择(然后您可以使用本地地址),或者您已经找到了工作经理角色的远程地址。远程地址的格式为 akka.tcp:// flink @ [address_of_actor_system] / user / jobmanager_ [instance_number] 。如果您有权访问 FlinkMiniCluster ,则可以使用 leaderGateway 诺言获得当前领导者的 ActorGateway

If you want to obtain an ActorRef to the real job manager, you either have to use the same ActorSystem for the selection (then you can use a local address) or you have find out the remote address for the job manager actor. The remote address has the format akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]. If you have access to the FlinkMiniCluster then you can use the leaderGateway promise to obtain the current leader's ActorGateway.

这篇关于从代码中取消Apache Flink作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆