Flink 检查点不断失败 [英] Flink checkpoints keeps failing

查看:63
本文介绍了Flink 检查点不断失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试使用 RocksDB 后端设置 Flink 有状态作业.我们正在使用会话窗口,间隔 30 分钟.我们使用了aggregateFunction,所以不使用任何Flink 状态变量.通过采样,我们有不到 20k 个事件/秒,20 - 30 个新会话/秒.我们的会话基本上收集了所有事件.会话累加器的大小会随着时间的推移而增加.我们总共使用了 10G 内存和 Flink 1.9,128 个容器.以下是设置:

we are trying to setup a Flink stateful job using RocksDB backend. We are using session window, with 30mins gap. We use aggregateFunction, so not using any Flink state variables. With sampling, we have less than 20k events/s, 20 - 30 new sessions/s. Our session basically gather all the events. the size of the session accumulator would go up along time. We are using 10G memory in total with Flink 1.9, 128 containers. Following's the settings:

state.backend: rocksdb
state.checkpoints.dir: hdfs://nameservice0/myjob/path
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

containerized.heap-cutoff-ratio: 0.45
taskmanager.network.memory.fraction: 0.5
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 2560mb

从我们对给定时间的监控来看,RocksDB 内存表大小小于 10m,我们的堆使用量不到 1G,但我们的直接内存使用量(网络缓冲区)使用了 2.5G.缓冲池/缓冲区使用指标都为 1(满).我们的检查站总是失败,我想知道网络缓冲区部分能用完这么多内存是否正常?

From our monitoring of a given time, rocksdb memtable size is less than 10m, Our heap usage is less than 1G, but our direct memory usage (network buffer) is using 2.5G. The buffer pool/ buffer usage metrics are all at 1 (full). Our checkpoints keep failing, I wonder if it's normal that the network buffer part could use up this much memory?

如果您能给我一些建议,我将不胜感激:)谢谢!

I'd really appreciate if you can give some suggestions:) Thank you!

推荐答案

就其价值而言,会话窗口确实在内部使用 Flink 状态.(大多数源和接收器也是如此.)根据您将会话事件收集到会话累加器中的方式,这可能是一个性能问题.如果您需要将所有事件收集在一起,为什么要使用 AggregateFunction 来执行此操作,而不是让 Flink 为您执行此操作?

For what it's worth, session windows do use Flink state internally. (So do most sources and sinks.) Depending on how you are gathering the session events into the session accumulator, this could be a performance problem. If you need to gather all of the events together, why are you doing this with an AggregateFunction, rather than having Flink do this for you?

为了获得最佳的窗口性能,您希望使用 ReduceFunction 或 AggregateFunction 逐步减少/聚合窗口,仅保留一小部分最终将成为窗口结果的状态.另一方面,如果你只使用 ProcessWindowFunction 没有预聚合,那么 Flink 将在内部使用一个附加列表状态对象,当与 RocksDB 一起使用时,它非常有效——它只需要序列化每个事件以将其附加到最后的名单.当窗口最终被触发时,该列表将作为一个以块反序列化的 Iterable 传递给您.另一方面,如果您使用 AggregateFunction 推出自己的解决方案,您可能会在每次访问/更新时让 RocksDB 反序列化和重新序列化累加器.这可能会变得非常昂贵,并且可以解释检查点失败的原因.

For the best windowing performance, you want to use a ReduceFunction or an AggregateFunction that incrementally reduces/aggregates the window, keeping only a small bit of state that will ultimately be the result of the window. If, on the other hand, you use only a ProcessWindowFunction without pre-aggregation, then Flink will internally use an appending list state object that when used with RocksDB is very efficient -- it only has to serialize each event to append it to the end of the list. When the window is ultimately triggered, the list is delivered to you as an Iterable that is deserialized in chunks. On the other hand, if you roll your own solution with an AggregateFunction, you may have RocksDB deserializing and reserializing the accumulator on every access/update. This can become very expensive, and may explain why the checkpoints are failing.

您分享的另一个有趣的事实是缓冲池/缓冲区使用指标表明它们已被充分利用.这是显着背压的迹象,这反过来又可以解释检查点失败的原因.检查点依赖于检查点屏障能够遍历整个执行图,在每个操作符运行时检查点,并在超时之前完成对作业的全面扫描.如果有背压,这可能会失败.

Another interesting fact you've shared is that the buffer pool / buffer usage metrics show that they are fully utilized. This is an indication of significant backpressure, which in turn would explain why the checkpoints are failing. Checkpointing relies on the checkpoint barriers being able to traverse the entire execution graph, checkpointing each operator as they go, and completing a full sweep of the job before timing out. With backpressure, this can fail.

背压的最常见原因是供应不足——换句话说,集群不堪重负.由于运营商跟不上,网络缓冲池得到充分利用.答案不是增加缓冲,而是消除/修复瓶颈.

The most common cause of backpressure is under-provisioning -- or in other words, overwhelming the cluster. The network buffer pools become fully utilized because the operators can't keep up. The answer is not to increase buffering, but to remove/fix the bottleneck.

这篇关于Flink 检查点不断失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆