Flink 应用程序的检查点大小不断增长 [英] Flink app's checkpoint size keeps growing

查看:45
本文介绍了Flink 应用程序的检查点大小不断增长的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个这样的管道:

env.addSource(kafkaConsumer, name_source)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.seconds(2)))
            .process(MyProcessor())
            .addSink(kafkaProducer)
            

这些键保证在当前正在处理的数据中是唯一的.因此,我希望状态大小不会超过 2 秒的数据增长.

The keys are guaranteed to be unique in the data that is being currently processed. Thus I would expect the state size to not grow over 2 seconds of data.

但是,我注意到状态大小在过去一天中一直在稳步增长(自应用部署以来).

However, I notice the state size has been steadily growing over the last day (since the app was deployed).

这是 flink 中的错误吗?

Is this a bug in flink?

在 aws kinesis 数据分析中使用 flink 1.11.2.

using flink 1.11.2 in aws kinesis data analytics.

推荐答案

Kinesis Data Analytics 始终使用 RocksDB 作为其状态后端.使用 RocksDB,死状态不会立即清除,它只是用墓碑标记,然后被压缩.我不确定 KDA 如何配置 RocksDB 压缩,但通常它会在级别达到特定大小时完成 - 我怀疑您的状态大小仍然足够小以至于没有发生压缩.

Kinesis Data Analytics always uses RocksDB as its state backend. With RocksDB, dead state isn't immediately cleaned up, it's merely marked with a tombstone and is later compacted away. I'm not sure how KDA configures RocksDB compaction, but typically it's done when a level reaches a certain size -- and I suspect your state size is still small enough that compaction hasn't occurred.

使用增量检查点(这就是 KDA 所做的),检查点是通过复制 RocksDB 的 SST 文件来完成的——在您的情况下,这些文件可能充满了陈旧数据.如果您让它运行足够长的时间,一旦完成压缩,您最终应该会看到检查点大小显着下降.

With incremental checkpoints (which is what KDA does), checkpointing is done by copying RocksDB's SST files -- which in your case are presumably full of stale data. If you let this run long enough you should eventually see a significant drop in checkpoint size, once compaction has been done.

这篇关于Flink 应用程序的检查点大小不断增长的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆