flink key 通过添加延迟;我怎样才能减少这种延迟? [英] flink keyBy adding delay; how can I reduce this latency?

查看:67
本文介绍了flink key 通过添加延迟;我怎样才能减少这种延迟?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我使用 KeyedStream 运行一个简单的 flink 应用程序时,我观察到事件的时间延迟从 0 到 100 毫秒不等.下面是程序

When I ran a simple flink application with KeyedStream, I observed the time latency of an event varies from 0 to 100ms. Below is the program

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Long> source = env.addSource(new SourceFunction<Long>() {
        public void run(SourceContext<Long> sourceContext) throws Exception {
            while(true) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(System.currentTimeMillis());
                    Thread.sleep(1000);
                }
            }
        }

        public void cancel() {}
    }).keyBy(new KeySelector<Long, Long>() {
        @Override
        public Long getKey(Long l) throws Exception {
            return l;
        }
    }).addSink(new SinkFunction<Long>() {
        @Override
        public void invoke(Long l) throws Exception {
            long diff = System.currentTimeMillis() - l;
            System.out.println("in Sink: diff=" + diff);
        }
    });
    env.execute();

输出为:

in Sink: diff=0
in Sink: diff=2
in Sink: diff=4
in Sink: diff=4
in Sink: diff=5
in Sink: diff=7
in Sink: diff=9
in Sink: diff=9
in Sink: diff=11
in Sink: diff=12
in Sink: diff=14
in Sink: diff=14
in Sink: diff=16
in Sink: diff=17
in Sink: diff=18
in Sink: diff=19
in Sink: diff=21
in Sink: diff=22
in Sink: diff=24
in Sink: diff=24
in Sink: diff=26
in Sink: diff=27
in Sink: diff=29
in Sink: diff=29
in Sink: diff=31
in Sink: diff=32
in Sink: diff=34
in Sink: diff=34
in Sink: diff=36
in Sink: diff=37
in Sink: diff=39
in Sink: diff=40
in Sink: diff=41
in Sink: diff=43
in Sink: diff=45
in Sink: diff=45
in Sink: diff=47
in Sink: diff=48
in Sink: diff=50
in Sink: diff=50
in Sink: diff=52
in Sink: diff=53
in Sink: diff=55
in Sink: diff=57
in Sink: diff=57
in Sink: diff=59
in Sink: diff=60
in Sink: diff=61
in Sink: diff=62
in Sink: diff=63
in Sink: diff=65
in Sink: diff=66
in Sink: diff=67
in Sink: diff=69
in Sink: diff=70
in Sink: diff=72
in Sink: diff=72
in Sink: diff=74
in Sink: diff=76
in Sink: diff=77
in Sink: diff=78
in Sink: diff=79
in Sink: diff=81
in Sink: diff=82
in Sink: diff=83
in Sink: diff=84
in Sink: diff=86
in Sink: diff=87
in Sink: diff=88
in Sink: diff=89
in Sink: diff=91
in Sink: diff=92
in Sink: diff=94
in Sink: diff=94
in Sink: diff=96
in Sink: diff=97
in Sink: diff=99
in Sink: diff=99
in Sink: diff=0
in Sink: diff=2
in Sink: diff=3
in Sink: diff=4
in Sink: diff=4
in Sink: diff=5
in Sink: diff=7
in Sink: diff=9
in Sink: diff=9
in Sink: diff=11
in Sink: diff=12
in Sink: diff=14
in Sink: diff=14
in Sink: diff=16
in Sink: diff=17
in Sink: diff=18
in Sink: diff=19
in Sink: diff=21
in Sink: diff=22
in Sink: diff=24
in Sink: diff=24
in Sink: diff=26
in Sink: diff=46
in Sink: diff=48
in Sink: diff=50
in Sink: diff=52
in Sink: diff=53
in Sink: diff=54
in Sink: diff=56
in Sink: diff=58
in Sink: diff=59
in Sink: diff=60
in Sink: diff=62
in Sink: diff=64
in Sink: diff=65
in Sink: diff=66
in Sink: diff=68
in Sink: diff=70
in Sink: diff=71
in Sink: diff=73
in Sink: diff=74
in Sink: diff=76
in Sink: diff=77
in Sink: diff=79
in Sink: diff=81
in Sink: diff=82
in Sink: diff=83
in Sink: diff=85
in Sink: diff=86
in Sink: diff=88
in Sink: diff=88
in Sink: diff=90
in Sink: diff=92
in Sink: diff=92
in Sink: diff=94
in Sink: diff=95
in Sink: diff=97
in Sink: diff=98
in Sink: diff=99
in Sink: diff=0
in Sink: diff=2
in Sink: diff=4
in Sink: diff=4
in Sink: diff=5
in Sink: diff=7
in Sink: diff=9

如您所见,延迟是一种模式,逐渐增加到 100,然后下降并从 0 开始,循环重复.我需要尽可能低的延迟.此示例是我的真实应用程序的简化版本.有人可以向我解释延迟的原因以及如何将其降低到尽可能低.

As you can see the latency is a pattern gradually increases to 100 and the drops and starts from 0 and the cycle repeats. I need the latency to be as low as possible. This example is a simplified version of my real application. Can someone explain me the reason for latency and how to reduce it to as low as possible.

推荐答案

造成这种延迟的原因是,通过添加 keyBy,您将在序列化/反序列化的同时强制进行网络洗牌.延迟如此多变的原因是因为网络缓冲.

The reason for this delay is that by adding that keyBy you are forcing a network shuffle along with serialization/deserialization. The reason the delay is so variable is because of the network buffering.

您需要阅读名为 控制延迟.tl;dr 是您想将网络缓冲区超时设置为较小的值:

You'll want to read the section of the documentation called Controlling Latency. The tl;dr is that you want to set the network buffer timeout to something small:

env.setBufferTimeout(timeoutMillis);

如果需要,您可以将缓冲区超时设置为零,但这会比将其设置为较小的值(例如 1 毫秒或 5 毫秒)对吞吐量的影响更大.默认值为 100 毫秒.关于 Flink 中的网络栈是如何组织的,详见 A在 Flink 项目博客上深入了解 Flink 的网络堆栈.

You can set the buffer timeout to zero if you want, but that will impact throughput more than setting it to something small (like 1ms, or 5ms). The default is 100ms. For details on how the network stack in Flink is organized, see A Deep-Dive into Flink's Network Stack on the Flink project blog.

在我们讨论这个主题时,其他延迟来源可能包括检查点屏障对齐和垃圾收集.

While we're on the subject, other sources of latency can include checkpoint barrier alignment and garbage collection.

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

将禁用屏障对齐,代价是只放弃一次处理语义.

will disable barrier alignment, at the cost of giving up exactly once processing semantics.

使用 RocksDB 状态后端将减少垃圾收集对象的数量(因为 RocksDB 将其状态保持在堆外),在某些情况下,以更糟糕的平均延迟为代价来改善最坏情况的延迟.然而,现代垃圾收集器使用 RocksDB 来改善最坏情况的延迟可能是一个错误.

Using the RocksDB state backend will reduce the number of objects to garbage collect (since RocksDB keeps its state off-heap), in some cases improving worst case latency at the cost of worse average latency. However, with modern garbage collectors using RocksDB to improve worst-case latency can be a mistake.

还有,

env.getConfig().enableObjectReuse();

将指示运行时重用用户对象以获得更好的性能.请记住,当用户代码函数不知道此行为时,这可能会导致错误.

will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when user-code functions are not aware of this behavior.

如果您使用水印,水印延迟会影响触发事件时间计时器(包括 Windows)的延迟,并且 autoWatermarkInterval 也对延迟有影响.

If you are using watermarks, the watermark delay affects the latency with which event time timers will be triggered (including windows), and the autoWatermarkInterval also has an impact on latency.

最后,事务接收器的使用增加了端到端延迟,因为这些接收器的下游消费者在事务完成之前不会看到提交的结果.预期延迟大约是检查点间隔的一半.

Finally, the use of transactional sinks adds end-to-end latency, since downstream consumers of those sinks won't see committed results until the transactions complete. The expected delay is roughly half the checkpoint interval.

如果您对测量延迟感兴趣,请查看 延迟跟踪监控 Apache Flink 应用程序 101.

If you are interested in measuring latency, take a look at Latency Tracking and the section on latency in Monitoring Apache Flink Applications 101.

这篇关于flink key 通过添加延迟;我怎样才能减少这种延迟?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆