Apache Flink 检查点卡住 [英] Apache Flink checkpointing stuck

查看:56
本文介绍了Apache Flink 检查点卡住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在运行一个 ListState 介于 300GB 和 400GB 之间的作业,并且有时列表可以增长到几千个.在我们的用例中,每个项目都必须有自己的 TTL,因此我们为这个 ListState 的每个新项目创建一个新的 Timer,并在 S3 上使用 RocksDB 后端.

we are running a job that has a ListState of between 300GB and 400GB and sometimes the list can grow to few thousands. In our use case, every item must have its own TTL, therefore we we create a new Timer for every new item of this ListState with a RocksDB backend on S3.

目前大约有 140 多个计时器(将在 event.timestamp + 40 天触发).

This currently is about 140+ millions of timers (that will trigger at event.timestamp + 40days).

我们的问题是,作业的检查点突然卡住了,或者非常慢(比如几小时内 1%),直到它最终超时.它通常会停止(flink 仪表板显示 0/12 (0%) 而前几行显示 12/12 (100%))在一段代码上很漂亮简单:

Our problem is that suddenly the checkpointing of the job gets stuck, or VERY slow (like 1% in few hours) until it eventually timeouts. It generally stops (flink dashboard shows 0/12 (0%) while the previous lines show 12/12 (100%)) on a piece of the code which is pretty simple :

[...]
    val myStream = env.addSource(someKafkaConsumer)
      .rebalance
      .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
      .uid("src_kafka_stream")
      .name("some_name")

      myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
        .getSideOutput(outputTag)
        .keyBy(_.name)
        .addSink(sink)
[...]

更多信息:

  • AT_LEAST_ONCE 检查点模式似乎比 EXACTLY_ONCE 更容易卡住
  • 几个月前,该州的数据量高达 1.5TB,我认为数十亿个计时器没有任何问题.
  • 运行这两个任务管理器的机器上的 RAM、CPU 和网络看起来正常
  • state.backend.rocksdb.thread.num = 4
  • 第一个事件发生在我们收到大量事件(大约数百万分钟)时,但没有发生在前一个事件中.
  • 所有事件都来自 Kafka 主题.
  • 在 AT_LEAST_ONCE 检查点模式下,作业仍然正常运行和消耗.

这是我们第二次遇到拓扑运行良好,每天有几百万个事件,然后突然停止检查点.我们不知道是什么导致了这种情况.

It's the second times that it happens to us that the topology runs very fine with few millions of events per day and suddenly stops checkpointing. We have no idea what could cause this.

谁能想到什么会突然导致检查点卡住?

Anyone can think of what could suddenly cause the checkpointing to get stuck?

推荐答案

一些想法:

如果你有很多定时器都或多或少地同时触发,这个定时器风暴将阻止其他任何事情发生——任务将循环调用 onTimer 直到没有更多的定时器被触发,在此期间他们的输入队列将被忽略,检查点障碍将不会进展.

If you have many timers all firing more-or-less simultaneously, this storm of timers will prevent anything else from happening -- the tasks will loop calling onTimer until there are no more timers to be fired, during which time their input queues will be ignored, and checkpoint barriers will not progress.

如果这是导致您遇到麻烦的原因,您可以向计时器添加一些随机抖动,以便事件风暴以后不会变成计时器风暴.重新组织使用 状态 TTL 可能是另一种选择.

If this is the cause of your troubles, you might add some random jitter to your timers so that event storms don't turn into timer storms later on. Reorganizing things to use State TTL might be another option.

如果堆上有很多计时器,这会导致非常高的 GC 开销.这不一定会使工作失败,但会使检查点不稳定.在这种情况下,将计时器移入 RocksDB 可能会有所帮助.

If you have a lot of timers on the heap, this can lead to very high GC overhead. This won't necessarily fail the job, but can make checkpointing unstable. In this case, moving the timers into RocksDB may help.

另外:由于您使用的是 RocksDB,从 ListState 切换到 MapState,以时间为键,可以让您删除单个条目,而无需在每次更新后重新序列化整个列表.(对于 RocksDB,MapState 中的每个键/值对都是一个单独的 RocksDB 对象.)以这种方式提高清理效率可能是最好的补救措施.

Also: since you are using RocksDB, switching from ListState to MapState, with time as the key, would let you remove single entries without having to reserialize the entire list after each update. (With RocksDB, each key/value pair in a MapState is a separate RocksDB object.) Making the cleanup more efficient in this way might be the best remedy.

这篇关于Apache Flink 检查点卡住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆