kafka 源流上的事件时间窗口 [英] Event time window on kafka source streaming

查看:61
本文介绍了kafka 源流上的事件时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Kafka 服务器中有一个主题.在程序中,我们以流的形式读取这个topic,并分配事件时间戳.然后对该流进行窗口操作.但该程序不起作用.调试后,似乎没有执行WindowOperator的processWatermark方法.这是我的代码.

There is a topic in Kafka server. In the program, we read this topic as a stream and assign event timestamp. Then do window operation on this stream. But the program doesn't work. After debug, it seems that processWatermark method of WindowOperator is not executed. Here is my code.

    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).assignTimestamps(timestampExtractor);

    advertisement
            .keyBy(keySelector)
            .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
                private static final long serialVersionUID = 5151607280638477891L;
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
                    out.collect(Iterables.size(values));
                }
            }).print();

为什么会这样?如果我在assignTimestamps(timestampExtractor)"之前添加keyBy(keySelector)",那么程序就可以工作了.谁能帮忙解释一下原因?

Why this happened? if I add "keyBy(keySelector)" before "assignTimestamps(timestampExtractor)" then the program works. Anyone could help to explain the reason?

推荐答案

您受到 Flink 中一个已知错误的影响:FLINK-3121:水印转发不适用于不产生任何数据的来源.

You are affected by a known bug in Flink: FLINK-3121:Watermark forwarding does not work for sources not producing any data.

问题是有更多的 FlinkKafkaConsumer 正在运行(很可能是 CPU 内核的数量(比如 4))然后你有分区(1).只有一个 Kafka 消费者发出水印,其他消费者空闲.

The problem is that there are more FlinkKafkaConsumer's running (most likely the number of CPU cores (say 4)) then you have partitions (1). Only one of the Kafka consumers is emitting watermarks, the other consumers are idling.

窗口运营商不知道这一点,正在等待所有消费者的水印到达.这就是窗口永远不会触发的原因.

The window operator is not aware of that, waiting for watermarks to arrive from all consumers. That's why the windows never trigger.

这篇关于kafka 源流上的事件时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆