Flink 检查点大小增长超过 20GB,检查点时间超过 1 分钟 [英] Flink checkpoints size are growing over 20GB and checkpoints time take over 1 minute

查看:45
本文介绍了Flink 检查点大小增长超过 20GB,检查点时间超过 1 分钟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先:

  • 我是 Flink 的新手(了解原理并且能够创建我需要的任何基本流作业)
  • 我使用 Kinesis Analytics 来运行我的 Flink 作业,默认情况下它使用间隔为 1 分钟的增量检查点.
  • Flink 作业正在使用 FlinkKinesisConsumer 和自定义 deserailzer 从 Kinesis 流中读取事件(将字节反序列化为一个简单的 Java 对象,该对象在整个作业中使用)

我想存档的只是计算过去 24 小时内有多少 ENTITY_ID/FOO 和 ENTITY_ID/BAR 事件.重要的是这个计数尽可能准确,这就是为什么我使用这个 Flink 功能而不是自己在 5 分钟的滚动窗口上进行运行总和.我还希望能够从一开始就统计TOTAL"事件(不仅仅是过去 24 小时),所以我还在结果中输出过去 5 分钟的事件计数,以便后期处理应用程序可以只需获取这 5 分钟的数据并计算总和.(此计数不必准确,如果出现中断并且我丢失了一些计数也可以)

What I would like to archieve is simply counting how many event of ENTITY_ID/FOO and ENTITY_ID/BAR there is for the past 24 hours. It is important that this count is as accurate as possible and this is why I'm using this Flink feature instead of doing a running sum myself on a 5 minute tumbling window. I also want to be able to have a count of 'TOTAL' events from the start (and not just for the past 24h) so I also output in the result the count of events for the past 5 minutes so that the post porcessing app can simply takes these 5 minute of data and do a running sum. (This count doesn't have to be accurate and it's ok if there is an outage and I lose some count)

现在,这项工作一直运行良好,直到上周我们的流量激增(10 倍).从那时起,Flink 变得疯狂.检查点大小开始从约 500MB 缓慢增长到 20GB,检查点时间大约需要 1 分钟并随着时间的推移而增长.应用程序开始失败,永远无法完全恢复,事件迭代器的年龄飙升也从未回落,因此没有消耗任何新事件.

Now, this job was working pretty good up until last week where we had a surge (10 times more) in traffic. From that point on Flink went banana. Checkpoint size starting to slowly grow from ~500MB to 20GB and checkpoint time were taking around 1 minutes and growing over time. The application started failing and never was able to fully recover and the event iterator age shoot up never went back down so no new events were being consumed.

因为我是 Flink 的新手,所以我不确定我做滑动计数的方式是完全没有优化还是完全错误.

Since I'm new with Flink I'm not enterely sure if the way I'm doing the sliding count is completely un optimised or plain wrong.

这是代码关键部分的一小段:

This is a small snippet of the key part of the code:

源(MyJsonDeserializationSchema 扩展 AbstractDeserializationSchema 并简单地读取字节并创建事件对象):

The source (MyJsonDeserializationSchema extends AbstractDeserializationSchema and simply read byte and create the Event object):

SourceFunction<Event> source =
      new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);

输入事件,简单的 java pojo 将在 Flink 操作符中使用:

The input event, simple java pojo which will be use in the Flink operators:

public class Event implements Serializable {
  public String entityId;
  public String entityType;
  public String entityName;
  public long eventTimestamp = System.currentTimeMillis();
}

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> eventsStream = kinesis
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(Event event) {
          return event.eventTimestamp;
        }
      })

DataStream<Event> fooStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "foo".equalsIgnoreCase(event.entityType);
        }
      })

 DataStream<Event> barStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "bar".equalsIgnoreCase(event.entityType);
        }
      })


StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Foo", fooTable);
    Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Bar", barTable);

Table slidingFooCountTable = fooTable
      .window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");

Table slidingBarCountTable = barTable
      .window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");

    Table tumblingFooCountTable = fooTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");
   
    Table tumblingBarCountTable = barTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");

    Table aggregatedTable = slidingFooCountTable
      .leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
      .select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");

    DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
    result.addSink(sink); // write to an output stream to be picked up by a lambda function

如果有更多 Flink 工作经验的人可以评论我的计数方式,我将不胜感激?我的代码是否完全过度设计?是否有更好、更有效的方法来计算 24 小时内的事件?

I would greatly appreciate if someone with more experience in working with Flink could comment on the way I have done my counting? Is my code completely over engineered? Is there a better and more efficient way of counting events over a 24h period?

我在 Stackoverflow @DavidAnderson 的某个地方读到过建议使用地图状态创建我们自己的滑动窗口并按时间戳对事件进行切片.但是我不确定这是什么意思,我没有找到任何代码示例来展示它.

I have read somewhere in Stackoverflow @DavidAnderson suggesting to create our own sliding window using map state and slicing the event by timestamp. However I'm not exactly sure what this mean and I didn't find any code example to show it.

推荐答案

您在其中创建了很多窗口.如果您正在创建一个大小为 24 小时且滑动为 5 分钟的滑动窗口,这意味着那里将有很多打开的窗口,因此您可能期望在给定日期收到的所有数据都将被检查点如果您考虑一下,至少有一个窗口.所以,可以肯定的是,大小 &检查点的时间会随着数据本身的增长而增长.

You are creating quite a few windows in there. If You are creating a sliding window with a size of 24h and slide of 5 mins this means that there will be a lot of open windows in there, so You may expect that all the data You have received in the given day will be checkpointed in at least one window if You think about it. So, it's certain that the size & time of the checkpoint will grow as the data itself grows.

如果代码可以重写,为了能够得到答案,您需要在此处提供有关您到底要实现什么目标的更多详细信息.

To be able to get the answer if the code can be rewritten You would need to provide more details on what exactly are You trying to achieve here.

这篇关于Flink 检查点大小增长超过 20GB,检查点时间超过 1 分钟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆