Flink检查点的大小增长超过20GB,检查点时间超过1分钟 [英] Flink checkpoints size are growing over 20GB and checkpoints time take over 1 minute

查看:292
本文介绍了Flink检查点的大小增长超过20GB,检查点时间超过1分钟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首要:

  • 我是Flink的新手(了解原理并能够创建我需要的任何基本流工作)
  • 我使用Kinesis Analytics运行Flink作业,默认情况下,它使用间隔为1分钟的增量检查点.
  • Flink作业正在使用FlinkKinesisConsumer和自定义反序列化器(将字节反序列化为一个简单的Java对象,在整个作业中使用)从Kinesis流中读取事件

我想存档的只是简单地计算过去24小时内发生了ENTITY_ID/FOO和ENTITY_ID/BAR个事件.重要的是,此计数要尽可能准确,这就是为什么我使用此Flink功能而不是自己在5分钟的翻滚窗口上进行累加总和的原因.我还希望能够从一开始就拥有"TOTAL"个事件的计数(而不仅仅是过去24小时),因此我也要在结果中输出过去5分钟的事件计数,以便后期处理应用可以只需花费这5分钟的数据并计算总和即可.(此计数不一定是准确的,如果发生中断并且我丢失了一些计数也可以)

What I would like to archieve is simply counting how many event of ENTITY_ID/FOO and ENTITY_ID/BAR there is for the past 24 hours. It is important that this count is as accurate as possible and this is why I'm using this Flink feature instead of doing a running sum myself on a 5 minute tumbling window. I also want to be able to have a count of 'TOTAL' events from the start (and not just for the past 24h) so I also output in the result the count of events for the past 5 minutes so that the post porcessing app can simply takes these 5 minute of data and do a running sum. (This count doesn't have to be accurate and it's ok if there is an outage and I lose some count)

现在,这项工作一直很好,直到上周我们的流量激增(增长了10倍).从那以后,Flink变成了香蕉.检查点大小开始逐渐从约500MB增长到20GB,检查点时间大约需要1分钟,并且随着时间的推移逐渐增加.该应用程序开始出现故障,并且永远无法完全恢复,并且事件迭代器的寿命没有回落,因此没有新的事件被消耗.

Now, this job was working pretty good up until last week where we had a surge (10 times more) in traffic. From that point on Flink went banana. Checkpoint size starting to slowly grow from ~500MB to 20GB and checkpoint time were taking around 1 minutes and growing over time. The application started failing and never was able to fully recover and the event iterator age shoot up never went back down so no new events were being consumed.

由于我是Flink的新手,所以我不确定我进行滑动计数的方式是否完全未优化或完全错误.

Since I'm new with Flink I'm not enterely sure if the way I'm doing the sliding count is completely un optimised or plain wrong.

这是代码关键部分的一小段:

This is a small snippet of the key part of the code:

源(MyJsonDeserializationSchema扩展了AbstractDeserializationSchema并仅读取字节并创建Event对象):

The source (MyJsonDeserializationSchema extends AbstractDeserializationSchema and simply read byte and create the Event object):

SourceFunction<Event> source =
      new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);

输入事件,将在Flink运算符中使用的简单java pojo:

The input event, simple java pojo which will be use in the Flink operators:

public class Event implements Serializable {
  public String entityId;
  public String entityType;
  public String entityName;
  public long eventTimestamp = System.currentTimeMillis();
}

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> eventsStream = kinesis
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(Event event) {
          return event.eventTimestamp;
        }
      })

DataStream<Event> fooStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "foo".equalsIgnoreCase(event.entityType);
        }
      })

 DataStream<Event> barStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "bar".equalsIgnoreCase(event.entityType);
        }
      })


StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Foo", fooTable);
    Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Bar", barTable);

Table slidingFooCountTable = fooTable
      .window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");

Table slidingBarCountTable = barTable
      .window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");

    Table tumblingFooCountTable = fooTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");
   
    Table tumblingBarCountTable = barTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");

    Table aggregatedTable = slidingFooCountTable
      .leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
      .select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");

    DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
    result.addSink(sink); // write to an output stream to be picked up by a lambda function

如果有更多使用Flink的经验的人可以对我的计算方式发表评论,我将不胜感激?我的代码是否设计过度?是否有更好,更有效的方法来计数24小时内的事件?

I would greatly appreciate if someone with more experience in working with Flink could comment on the way I have done my counting? Is my code completely over engineered? Is there a better and more efficient way of counting events over a 24h period?

我在Stackoverflow @DavidAnderson的某个地方读过,建议使用地图状态创建我们自己的滑动窗口,并按时间戳将事件切片.但是我不确定这意味着什么,也没有找到任何代码示例来显示它.

I have read somewhere in Stackoverflow @DavidAnderson suggesting to create our own sliding window using map state and slicing the event by timestamp. However I'm not exactly sure what this mean and I didn't find any code example to show it.

推荐答案

您将在其中创建许多窗口.如果您要创建一个大小为24h且滑动时间为5分钟的滑动窗口,则这意味着其中将有很多打开的窗口,因此您可能希望您在给定日期收到的所有数据都将在以下位置进行检查:考虑一下,至少要有一个窗口.因此,可以确定尺寸&检查点的时间将随着数据本身的增长而增长.

You are creating quite a few windows in there. If You are creating a sliding window with a size of 24h and slide of 5 mins this means that there will be a lot of open windows in there, so You may expect that all the data You have received in the given day will be checkpointed in at least one window if You think about it. So, it's certain that the size & time of the checkpoint will grow as the data itself grows.

要想得到答案,可以重写代码.您需要在此处提供更多详细信息,以了解您到底要达到什么目的.

To be able to get the answer if the code can be rewritten You would need to provide more details on what exactly are You trying to achieve here.

这篇关于Flink检查点的大小增长超过20GB,检查点时间超过1分钟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆