当flink运行超过12小时时,计算机上CPU负载增加的原因 [英] Causes of CPU load on the machine increase when flink run more more than 12 hours

查看:151
本文介绍了当flink运行超过12小时时,计算机上CPU负载增加的原因的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个flink作业,将并行度设置为6,进行了一些简单的转换,问题是当Flink运行了12个小时以上时,例如,机器上的负载开始增加,那么我认为这是因为的流量在一天的几个小时内变成flink,但问题是当流量下降时,计算机上的负载继续更高,虽然比以前低但仍然更高.

I have a flink job, with parallelism set to 6, few simple transformations and the issue is that when Flink is been running for more than 12 hours for example the Load on the machine start to increase, then I thought that was because of the traffic into flink during some hours of the day, but the issue is that when the traffic goes down, the load on the machine continue a bit higher, lower than before but still higher.

用例:

DataStream<Event> from_source = rabbitConsumer
                .flatMap(new RabbitMQConsumer())
                .assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source 
                    .filter(new NullidsFilterFunction())
KeyedStream<String, Event> keyed_stream = data_stream.keyby(k->k.id);

/*one stateful operator*/
data_stream.map(new EventCount(x))
            .keyBy(k -> new Date(k.timestamp.getTime()).toString())
            .window(TumblingEventTimeWindows.of(Time.ninutes(30)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*two stateful operator*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*three*/
keyed_stream.filter(new FilterFunction())
            .map(new AvailabilityCheckClass())
            .addSink(new SinkFuncion());

/*four*/
product_view_keyed_stream = data_stream
            .filter(new FilterFunction())
            .map(new EventProdView(x))
            .keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
  product_view_keyed_stream.addSink(new SinkFuncion());

/*five stateful operator*/
product_view_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
          .process(new MyProcessWindowFunction())
          .addSink(new SinkFuncion());

/*Six stateful operator with 4 ConcurrentHashMap into the state*/
keyed_stream.flatmap(new FlatMapFunction())
            .addSink(new SinkFuncion());

/*seven stateful operator*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*eight stateful operator*/
data_stream.filter(new FilterFunction())
           .keyBy(k -> k.rType.equals("ClickIdent") ? k.avidat : k.email_sha2)
           .flatmap(new FlatMapFunction())
           .addSink(new SinkFuncion());

Mi问题:例如,当我的flink作业运行了6个小时以上时,可能是CPU使用率过高的原因.

Mi question: What could be the cause of the high CPU Uses when my flink job is running for more than 6 hours for example.

见解:堆内存看起来很好(没有OOM),检查点都已完成,没有丢失事件,JVM CPU消耗也看起来很好,CMS GC年轻代计数器总是增加的(这让我感到担忧,尽管那应该是正常的,因为它是计数器,但增加得太快),此作业作为一个简单的Java应用程序运行(本地执行而不是作为带有flink安装的集群,只是java -jar flink.jar不知道这是否有事,只是共享信息)

Insights: Heap Memory looks fine(no OOM), checkpoints are all completed, no losing events, JVM CPU consumption looks fine too, CMS GC young generation counter always increases (this worries me despite that should be normal because it is a counter, but increases too fast), this job is running as a simple java application (local execution not as a cluster with a flink installation, just java -jar flink.jar don't know if this has anything to do, just sharing information)

一个小时的例子

非常感谢!

推荐答案

由于您使用的是基于堆的状态后端(FSStateBackend在JVM堆上保持其工作状态),并且状态TTL配置为1(或3)天,可以预见该州的规模将会增长.它会增长多少取决于特定的应用程序;这取决于您的关键空间随着时间的增长.

Since you are using a heap-based state backend (the FSStateBackend keeps its working state on the JVM heap), and the state TTL is configured to 1 (or 3) days, it's to be expected that the state size will grow. How much it will grow is very application specific; it depends on how your key space grows over time.

您能否在某些情况下放置603MB的检查点大小?即,每个不同的键的状态是多少?听起来您很惊讶,但不清楚原因.

Can you put the 603MB checkpoint size in some context? I.e., how much state is that for each distinct key? It sounds like you are surprised, but it's not clear why.

检查点变慢的原因有很多,但是通常这表明到达远程文件系统时背压或某种资源争用-即S3速率限制.如果在Flink WebUI中查看检查点统计信息,则可以在那里找到线索.请查看检查点是否需要很长时间才能遍历执行图,或者检查点的异步部分是否需要很长时间才能将检查点写入远程磁盘.并寻找不对称性-一个实例比其他实例花费更长的时间并拥有更多的状态吗?

There are many reasons why checkpoints can become slow, but generally this is an indication of either backpressure or some sort of resource contention in reaching the remote filesystem -- i.e., S3 rate limits. If you look in the Flink WebUI at the checkpointing statistics, you can look for clues there. Look to see if the checkpoint barriers taking a long time to traverse the execution graph, or if is it taking a long time for the asynchronous part of the checkpointing to write the checkpoint to the remote disks. And look for asymmetries -- is one instance taking much longer and having much more state than others?

如果在用户功能中执行任何阻塞的输入/输出操作,则可能会引起麻烦.或者您可能有明显的数据偏斜(例如,热键).或任务管理器和分布式文件系统之间的网络缓慢.否则群集可能配置不足-您可能需要提高并行度.

If you are doing any blocking i/o in a user function, that can cause trouble. Or you may have significant data skew (e.g., a hot key). Or a slow network between the task manager and the distributed filesystem. Or the cluster may be under provisioned -- you may need to increase the parallelism.

您可能需要增加检查点超时.如果在某个时刻检查点持续时间确实有问题,则可以切换到使用RocksDB状态后端,以便能够使用增量检查点(但这是否会有所帮助取决于所发生的事情).或者,您可以更改状态TTL配置以更快地清除状态.

You may need to increase the checkpoint timeout. If at some point the checkpoint duration becomes truly problematic, you could switch to using the RocksDB state backend, in order to be able to use incremental checkpointing (but whether this will help depends on what's going on). Or you could change the state TTL configuration to purge state more quickly.

这篇关于当flink运行超过12小时时,计算机上CPU负载增加的原因的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆