基于事件时间输出的 Flink 时间窗口 [英] Flink time window based on event time output nothing

查看:37
本文介绍了基于事件时间输出的 Flink 时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在流处理问题中,我们有 3 个传感器,每个传感器每 8 毫秒生成一个带时间戳的样本(传感器的时间是同步的).所以我想合并每个时间戳的数据(如果有 3 个传感器,我们应该为每个时间戳输出 3 个合并的样本数据).此外,我们有一个 160 毫秒的时间限制,这样每个数据最多应该在 160 毫秒后输出,因为它是生成时间戳.所以我决定使用 Flink EventTime 概念和时间窗口.由于时间戳在每个传感器的样本中都是唯一的,因此我们将其视为数据流的关键.

In a stream processing problem, we have 3 sensors that each generate a timestamped sample every 8 milliseconds (Sensor's times are synchronized). So I want to merge data for every timestamp (In case of 3 sensors we should output 3 merged sample data for every timestamp). In addition, we have a time limit of 160 milliseconds such that every data should be outputted at most after 160 milliseconds since it's generated timestamp. So I decided to use Flink EventTime concept and a time window. Because timestamp is unique in every sensor's sample, we consider it as key for datastream.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, JSONObject>>(Time.milliseconds(160)) {

            @Override
            public long extractTimestamp(Tuple3<String, Long, JSONObject> element) {
                return element.f1 ;
            }
        }).keyBy(1).timeWindow(Time.milliseconds(160))
                .allowedLateness(Time.milliseconds(2))
                .sideOutputLateData(lateOutputTag)
                .reduce(Do merging samples);

在代码中,我们首先将流的第二个字段引入为事件时间并设置周期性水印(因为数据以固定速率以固定延迟生成).之后,我们将事件时间设置为流的关键.我们也想收集迟到的数据,所以我们使用了 sideOutputLateData.最后,我们减少(合并)具有相同键的数据.问题是在 Flink 事件时间模式下定义的窗口不输出任何数据!如果不设置事件时间,它将输出数据,但我想使用事件时间作为 Flin 窗口的时间.我尝试了多次窗口和水印,但他们没有输出任何内容.

In the code, we first introduce the second field of the stream as an event time and set a periodic watermark (since data are generated at a fixed rate with a fixed delay). After that, we set the event time as key for the stream. we want to gather late data, too so we used sideOutputLateData. At last, we reduce (merge) data with the same key. The problem is in the Flink event time mode the defined window doesn't output any data! Without setting event time it will output data but I want to use event time as a time for Flin windowing. I tried multiple time for window and watermark but they didn't output anything.

我使用 Flink 计数窗口和自定义超时触发器成功解决了同样的问题.

I solved the same problem using Flink count window and a customized timeout trigger successfully.

更新:传入的数据流格式为(如果我们有 3 个传感器):

Update: The incoming datastream format is in the type (If we had 3 sensor):

sensor_id, timestamp, data
(1, 1531980773390, {})
(2, 1531980773390, {})
(3, 1531980773390, {})
(1, 1531980773398, {})
(2, 1531980773398, {})
(3, 1531980773398, {})

以此类推,每 8 毫秒一次.

and so on for every 8 milliseconds.

在数据流中保存后期数据

final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data") {
        };
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.getSideOutput(lateOutputTag);

推荐答案

莫非一切都晚了?

默认情况下,周期性水印生成器每 200 毫秒调用一次(实时测量,而非事件时间).在您的情况下,这可能会相当长.使用 setAutoWatermarkInterval 来改变这一点.您可能还想考虑使用 setBufferTimeout 以减少延迟.

By default, periodic watermark generators are called every 200msec (measured in real time, not event time). That may be rather long in your case. Use setAutoWatermarkInterval to change this. You may also want to think about using setBufferTimeout to reduce latency.

这篇关于基于事件时间输出的 Flink 时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆