Flink 外化检查点的两个问题 [英] Two questions on Flink externalized checkpoints

查看:38
本文介绍了Flink 外化检查点的两个问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个关于 Flink 外化检查点的问题

I have two questions on Flink externalized checkpoints

(Q1) 我可以在 flink-conf.yaml 中设置state.checkpoints.dir"以使外部检查点正常工作,但是当我从 IDE 运行 flink 时如何实现同样的事情?我尝试了 (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html)但没有运气.我就是这样做的:

(Q1) I can set "state.checkpoints.dir" in flink-conf.yaml to get externalized checkpoints to work all right, but how do I achieve same thing when I run flink from IDE? I tried the GlobalConfiguration approach mentioned in (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html) but no luck. This is how I did it:

Configuration cfg =
                GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

这是IDE中显示的错误消息:

and this is the error msg show in IDE:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
    ... 19 more

Process finished with exit code 1

(Q2) 在检查点的文档中 (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html),它说这样,你将有如果你的工作失败,一个检查点可以从那里恢复.",取消的工作怎么样?新作业会继续使用现有检查点还是从自己的检查点开始?

(Q2) In the checkpoint's document (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html), it says "This way, you will have a checkpoint around to resume from if your job fails.", how about the cancelled jobs? will the new job carry on with the existing checkpoint or it will start with its own checkpoint?

推荐答案

您可以控制在取消作业时是否删除外部化检查点.如果你想保留它们,你可以这样做:

You can control whether externalized checkpoints are deleted when the job is cancelled. If you want to retain them, you can do this:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

有关更多信息,请参阅文档.

For more info, see the docs.

从外部检查点恢复 这样做(与从保存点恢复相同):

To resume from an externalized checkpoint one does this (the same as resuming from a savepoint):

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

这篇关于Flink 外化检查点的两个问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆