Flink检查点不断失败 [英] Flink checkpoints keeps failing

查看:61
本文介绍了Flink检查点不断失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正尝试使用RocksDB后端设置Flink有状态作业.我们正在使用会话窗口,间隔为30分钟.我们使用aggregateFunction,因此不使用任何Flink状态变量.通过采样,我们的事件数/秒少于20k,新会话数为20-30.我们的会议基本上收集了所有事件.会话累加器的大小会随着时间增加.我们总共使用10G内存和Flink 1.9,128个容器.以下是设置:

we are trying to setup a Flink stateful job using RocksDB backend. We are using session window, with 30mins gap. We use aggregateFunction, so not using any Flink state variables. With sampling, we have less than 20k events/s, 20 - 30 new sessions/s. Our session basically gather all the events. the size of the session accumulator would go up along time. We are using 10G memory in total with Flink 1.9, 128 containers. Following's the settings:

state.backend: rocksdb
state.checkpoints.dir: hdfs://nameservice0/myjob/path
state.backend.rocksdb.memory.managed: true
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1

containerized.heap-cutoff-ratio: 0.45
taskmanager.network.memory.fraction: 0.5
taskmanager.network.memory.min: 512mb
taskmanager.network.memory.max: 2560mb

根据我们对给定时间的监控,rocksdb的内存大小小于10m,我们的堆使用量小于1G,但是我们的直接内存使用量(网络缓冲区)使用的是2.5G.缓冲池/缓冲区使用率指标均为1(满).我们的检查站不断失败,我想知道网络缓冲区部分会耗尽这么多的内存是否正常?

From our monitoring of a given time, rocksdb memtable size is less than 10m, Our heap usage is less than 1G, but our direct memory usage (network buffer) is using 2.5G. The buffer pool/ buffer usage metrics are all at 1 (full). Our checkpoints keep failing, I wonder if it's normal that the network buffer part could use up this much memory?

如果您能提供一些建议,我将不胜感激:)谢谢!

I'd really appreciate if you can give some suggestions:) Thank you!

推荐答案

对于它的价值,会话窗口确实在内部使用Flink状态.(大多数源和接收器也是如此.)这可能是性能问题,具体取决于您将会话事件收集到会话累加器中的方式.如果您需要将所有事件收集在一起,为什么要使用AggregateFunction而不是Flink来做到这一点?

For what it's worth, session windows do use Flink state internally. (So do most sources and sinks.) Depending on how you are gathering the session events into the session accumulator, this could be a performance problem. If you need to gather all of the events together, why are you doing this with an AggregateFunction, rather than having Flink do this for you?

为了获得最佳的开窗性能,您想要使用ReduceFunction或AggregateFunction来逐步缩小/聚合窗口,只保留一小部分状态,这些状态最终将是窗口的结果.另一方面,如果仅使用ProcessWindowFunction而不进行预聚合,则Flink将在内部使用附加列表状态对象,该对象在与RocksDB一起使用时非常有效-只需序列化每个事件即可将其附加到末尾列表中.当窗口最终被触发时,列表将作为可迭代的Iterable交付给您,该Iterable将按块反序列化.另一方面,如果您使用AggregateFunction推出自己的解决方案,则可能会使RocksDB在每次访问/更新时反序列化和重新序列化累加器.这会变得非常昂贵,并可能解释检查点失败的原因.

For the best windowing performance, you want to use a ReduceFunction or an AggregateFunction that incrementally reduces/aggregates the window, keeping only a small bit of state that will ultimately be the result of the window. If, on the other hand, you use only a ProcessWindowFunction without pre-aggregation, then Flink will internally use an appending list state object that when used with RocksDB is very efficient -- it only has to serialize each event to append it to the end of the list. When the window is ultimately triggered, the list is delivered to you as an Iterable that is deserialized in chunks. On the other hand, if you roll your own solution with an AggregateFunction, you may have RocksDB deserializing and reserializing the accumulator on every access/update. This can become very expensive, and may explain why the checkpoints are failing.

您共享的另一个有趣的事实是,缓冲池/缓冲区使用率指标表明它们已被充分利用.这表明存在显着的背压,这反过来又可以解释检查点失败的原因.检查点依赖于检查点障碍能够遍历整个执行图,在每个操作员走动时对其进行检查并在超时之前完成作业的完整扫描.使用背压,这可能会失败.

Another interesting fact you've shared is that the buffer pool / buffer usage metrics show that they are fully utilized. This is an indication of significant backpressure, which in turn would explain why the checkpoints are failing. Checkpointing relies on the checkpoint barriers being able to traverse the entire execution graph, checkpointing each operator as they go, and completing a full sweep of the job before timing out. With backpressure, this can fail.

最常见的背压原因是配置不足-换句话说,使群集不堪重负.网络缓冲池被充分利用,因为运营商无法跟上.答案不是增加缓冲,而是消除/修复瓶颈.

The most common cause of backpressure is under-provisioning -- or in other words, overwhelming the cluster. The network buffer pools become fully utilized because the operators can't keep up. The answer is not to increase buffering, but to remove/fix the bottleneck.

这篇关于Flink检查点不断失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆