Kafka源流上的事件时间窗口 [英] Event time window on kafka source streaming

查看:375
本文介绍了Kafka源流上的事件时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Kafka服务器中有一个主题.在程序中,我们将此主题作为流读取,并分配事件时间戳.然后在此流上执行窗口操作.但是该程序不起作用.调试后,似乎没有执行WindowOperator的processWatermark方法.这是我的代码.

There is a topic in Kafka server. In the program, we read this topic as a stream and assign event timestamp. Then do window operation on this stream. But the program doesn't work. After debug, it seems that processWatermark method of WindowOperator is not executed. Here is my code.

    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).assignTimestamps(timestampExtractor);

    advertisement
            .keyBy(keySelector)
            .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
                private static final long serialVersionUID = 5151607280638477891L;
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
                    out.collect(Iterables.size(values));
                }
            }).print();

为什么会这样?如果我在"assignTimestamps(timestampExtractor)"之前添加"keyBy(keySelector)",则该程序有效.任何人都可以帮助解释原因吗?

Why this happened? if I add "keyBy(keySelector)" before "assignTimestamps(timestampExtractor)" then the program works. Anyone could help to explain the reason?

推荐答案

您受到Flink中一个已知错误的影响: FLINK-3121:水印转发不适用于不产生任何数据的来源.

You are affected by a known bug in Flink: FLINK-3121:Watermark forwarding does not work for sources not producing any data.

问题在于,有更多的FlinkKafkaConsumer正在运行(很可能是CPU内核数(例如4个)),然后才有分区(1).卡夫卡(Kafka)消费者中只有一个正在散布水印,而其他消费者则在闲置.

The problem is that there are more FlinkKafkaConsumer's running (most likely the number of CPU cores (say 4)) then you have partitions (1). Only one of the Kafka consumers is emitting watermarks, the other consumers are idling.

窗口操作员并不知道这一点,它正在等待所有消费者收到水印.这就是窗口永远不会触发的原因.

The window operator is not aware of that, waiting for watermarks to arrive from all consumers. That's why the windows never trigger.

这篇关于Kafka源流上的事件时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆