Apache Flink检查点卡住 [英] Apache Flink checkpointing stuck

查看:100
本文介绍了Apache Flink检查点卡住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在运行一个ListState在300GB至400GB之间的作业,并且有时列表会增长到几千个.在我们的用例中,每个项目都必须具有自己的TTL,因此我们为此列表状态的每个新项目创建一个新的Timer,并在S3上具有RocksDB后端.

we are running a job that has a ListState of between 300GB and 400GB and sometimes the list can grow to few thousands. In our use case, every item must have its own TTL, therefore we we create a new Timer for every new item of this ListState with a RocksDB backend on S3.

当前大约有140+百万个计时器(将在 event.timestamp + 40days 触发).

This currently is about 140+ millions of timers (that will trigger at event.timestamp + 40days).

我们的问题是,作业的检查点突然卡住,或者非常缓慢(例如几小时内达到1%),直到最终超时.它通常在一段漂亮的代码上停止(flink仪表板显示 0/12(0%),而前几行显示 12/12(100%))简单:

Our problem is that suddenly the checkpointing of the job gets stuck, or VERY slow (like 1% in few hours) until it eventually timeouts. It generally stops (flink dashboard shows 0/12 (0%) while the previous lines show 12/12 (100%)) on a piece of the code which is pretty simple :

[...]
    val myStream = env.addSource(someKafkaConsumer)
      .rebalance
      .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
      .uid("src_kafka_stream")
      .name("some_name")

      myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
        .getSideOutput(outputTag)
        .keyBy(_.name)
        .addSink(sink)
[...]

更多信息:

  • 与EXACTLY_ONCE相比,AT_LEAST_ONCE检查点模式似乎更容易卡住
  • 几个月前,该州的数据高达1.5TB,我认为数十亿计时器没有任何问题.
  • 同时运行两个任务管理器的计算机上的RAM,CPU和网络
  • state.backend.rocksdb.thread.num = 4
  • 当我们收到大量事件(约数百万分钟)时发生了第一件事,而上一件事却没有发生.
  • 所有事件均来自Kafka主题.
  • 处于AT_LEAST_ONCE检查点模式时,作业仍在运行并正常消耗.

这是我们第二次碰巧,拓扑运行得很好,每天有几百万个事件,并且突然停止检查点.我们不知道是什么原因造成的.

It's the second times that it happens to us that the topology runs very fine with few millions of events per day and suddenly stops checkpointing. We have no idea what could cause this.

任何人都可以想到会突然导致检查点卡住的原因吗?

Anyone can think of what could suddenly cause the checkpointing to get stuck?

推荐答案

一些想法:

如果您有许多同时触发或多或少触发的计时器,则这种计时器风暴将阻止发生其他任何事件-任务将循环调用onTimer,直到没有其他计时器被触发为止,在此期间它们的输入队列将被忽略,检查点障碍也不会继续.

If you have many timers all firing more-or-less simultaneously, this storm of timers will prevent anything else from happening -- the tasks will loop calling onTimer until there are no more timers to be fired, during which time their input queues will be ignored, and checkpoint barriers will not progress.

如果这是造成您麻烦的原因,则可以在计时器中添加一些随机抖动,以免事件风暴稍后变成计时器风暴.重新整理内容以使用状态TTL 可能是另一种选择.

If this is the cause of your troubles, you might add some random jitter to your timers so that event storms don't turn into timer storms later on. Reorganizing things to use State TTL might be another option.

如果堆上有很多计时器,这会导致非常高的GC开销.这不一定会使工作失败,但会使检查点不稳定.在这种情况下,将计时器移至RocksDB可能会有所帮助.

If you have a lot of timers on the heap, this can lead to very high GC overhead. This won't necessarily fail the job, but can make checkpointing unstable. In this case, moving the timers into RocksDB may help.

另外:由于您使用的是RocksDB,因此以时间为键从ListState切换到MapState可以使您删除单个条目,而不必在每次更新后重新序列化整个列表.(使用RocksDB,MapState中的每个键/值对都是一个单独的RocksDB对象.)以这种方式提高清除效率可能是最好的解决方法.

Also: since you are using RocksDB, switching from ListState to MapState, with time as the key, would let you remove single entries without having to reserialize the entire list after each update. (With RocksDB, each key/value pair in a MapState is a separate RocksDB object.) Making the cleanup more efficient in this way might be the best remedy.

这篇关于Apache Flink检查点卡住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆