Flink如何处理IterativeStream中的检查点和状态? [英] How does Flink treat checkpoints and state within IterativeStream?
问题描述
我可以在
Flink当前仅为不提供作业的处理提供保证
迭代.在迭代作业上启用检查点会导致
例外.为了在迭代程序上强制检查点,
用户在启用检查点时需要设置特殊标志:
env.enableCheckpointing(interval,force = true).
Flink currently only provides processing guarantees for jobs without
iterations. Enabling checkpointing on an iterative job causes an
exception. In order to force checkpointing on an iterative program the
user needs to set a special flag when enabling checkpointing:
env.enableCheckpointing(interval, force = true). 请注意,飞行中的记录在循环边(以及状态
与此相关的更改)将在失败期间丢失. Please note that records in flight in the loop edges (and the state
changes associated with them) will be lost during failure. 这是指批处理作业中的迭代,还是迭代流,或两者兼而有之? Does this refer to iterations within batch jobs or to iterative streams, or both? 如果它引用迭代流,则在发生故障时,以下运算符的什么状态可用? (示例取于 关于使用 If it refers to iterative streams, what state of the following operators would be available in the event of failure? (Example taken from this conversation about sharing state across operators using a 如果 Would there be any change if 使用迭代时,此限制仅适用于Flink的 The limitation only applies to Flink's 使用流式迭代时,您实际上不会丢失运算符状态,但可能会丢失记录,这些记录已从运算符通过循环边发送回迭代头.在您的示例中,如果发生故障,从 When using streaming iterations you actually don't lose operator state but you might lose records which have been sent from an operator back to the iteration head via the loop edge. In your example, records sent from the 实际上有一个 Flink改进建议解决了这一缺点.但是,它尚未完成. There is actually a Flink improvement proposal which addresses this shortcoming. However, it has not been finished yet. 这篇关于Flink如何处理IterativeStream中的检查点和状态?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
ConnectedIterativeStreams
在运算符之间共享状态并以.closeWith(stream.broadcast())
终止迭代.ConnectedIterativeStreams
and terminating the iteration with .closeWith(stream.broadcast())
).DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids = input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids = inputsAndCentroids.flatMap(new MyCoFlatmap())
inputsAndCentroids.closeWith(updatedCentroids.broadcast())
class MyCoFlatmap implements CoFlatMapFunction<Point, Centroid, Centroid>{...}
MyCoFlatmap
是CoProcessFunction
而不是CoFlatMapFunction
(这也可以保持状态),会不会有任何变化?MyCoFlatmap
were to be a CoProcessFunction
instead of a CoFlatMapFunction
(meaning it could hold state too)?推荐答案
DataStream
/Streaming API.使用DataSet
/Batch API时,没有任何限制.DataStream
/Streaming API when using iterations. When using the DataSet
/Batch API, there are no limitations.updatedCentroids
发送到inputsAndCentroids
的记录可能会丢失.因此,在这种情况下,Flink无法保证一次处理就完全正确.updatedCentroids
to the inputsAndCentroids
might be lost in case of a failure. Hence, Flink cannot guarantee exactly once processing guarantees in this case.