关于Flink外部化检查点的两个问题 [英] Two questions on Flink externalized checkpoints
问题描述
我对Flink外部化检查点有两个疑问
I have two questions on Flink externalized checkpoints
(Q1)我可以在flink-conf.yaml中设置"state.checkpoints.dir",以使外部检查点可以正常工作,但是当我从IDE运行flink时如何实现相同的目的?我尝试了(
(Q1) I can set "state.checkpoints.dir" in flink-conf.yaml to get externalized checkpoints to work all right, but how do I achieve same thing when I run flink from IDE? I tried the GlobalConfiguration approach mentioned in (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html) but no luck. This is how I did it:
Configuration cfg =
GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
这是IDE中显示的错误消息:
and this is the error msg show in IDE:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
... 19 more
Process finished with exit code 1
(Q2)在检查点的文档中( https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html ),它说这样,您将拥有一个检查点,以便从工作失败后恢复.",被取消的工作又如何呢?新作业将继续使用现有检查点还是从其自己的检查点开始?
(Q2) In the checkpoint's document (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html), it says "This way, you will have a checkpoint around to resume from if your job fails.", how about the cancelled jobs? will the new job carry on with the existing checkpoint or it will start with its own checkpoint?
推荐答案
您可以控制取消作业时是否删除外部检查点.如果要保留它们,可以执行以下操作:
You can control whether externalized checkpoints are deleted when the job is cancelled. If you want to retain them, you can do this:
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
For more info, see the docs.
致 ="nofollow noreferrer">从外部检查点恢复(与从保存点恢复相同):
To resume from an externalized checkpoint one does this (the same as resuming from a savepoint):
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
这篇关于关于Flink外部化检查点的两个问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!