Apache Flink Tumbling Window延迟结果 [英] Apache Flink Tumbling Window delayed result

查看:82
本文介绍了Apache Flink Tumbling Window延迟结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用滚动窗口遇到了apache flink应用程序的问题.窗口大小为10秒,我希望每10秒就有一次resultSet DataStream.但是,除非我将更多数据推送到源流,否则最新窗口的resultSet总是被延迟.

Met an issue with an apache flink app using tumbling window. The window size is 10 seconds and I expect to have the resultSet DataStream every 10 seconds. However when the resultSet of the latest window is always delayed unless I push further data to the source stream.

例如,如果我将多个记录推送到"01:33:40.0"和"01:34:00.0"之间的源流中,然后停下来观看日志,则不会发生任何事情.

For example, if I push several records to the source stream between '01:33:40.0' and '01:34:00.0' and then stop to watch the log nothing will happen.

我再次在'01:37:XX'上推送一些数据,然后将获得'01:33:40.0'和'01:34:00.0'之间的窗口的resultSet,这是不期望的,因为下游接收器逻辑希望准时按一下resultSet.

I push some data again on '01:37:XX' and then will get the resultSet of the window between '01:33:40.0' and '01:34:00.0' which is not expected because the downstream sink logic is expecting the resultSet on time.

任何改进的提示将不胜感激.谢谢.

Any hints to improve this will be very much appreciated. Thanks.

下面是日志:

"log timestamp": "2019-11-15 01:37:45",
"message": "resultSet output: CLASS: 13 CNT: 1 from: 2019-11-15 01:33:40.0 to: 2019-11-15 01:34:00.0\n",

下面是代码段:

Table resultTable = tableEnv.sqlQuery(""+
    "SELECT " +
    "  CAST (N02_001 AS VARCHAR(10)) AS RAILWAY_CLASS, " +
    "  COUNT(*) RAILWAY_CLASS_COUNT, " +
    "  TUMBLE_START(rowtime, INTERVAL '20' SECOND) as WINDOW_START, " +
    "  TUMBLE_END(rowtime, INTERVAL '20' SECOND) as WINDOW_END " +
    " FROM Inputs " +
    " GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND), CAST (N02_001 AS VARCHAR(10))");


TupleTypeInfo<Tuple4<String, Long, Timestamp, Timestamp>> tupleType = new TupleTypeInfo<>(
    Types.STRING,
    Types.LONG,
    Types.SQL_TIMESTAMP,
    Types.SQL_TIMESTAMP);

DataStream<Tuple4<String, Long, Timestamp, Timestamp>> resultSet = tableEnv.toAppendStream(resultTable, tupleType);

resultSet
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
    String output = "CLASS: " + value.f0 + " CNT: " + value.f1 + " from: " + value.f2 + " to: " + value.f3 + "\n";
    log.warn("resultSet output: " + output);
    return value;
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP));

推荐答案

这是预期的行为,您正在使用 EventTime ,这意味着水印用于关闭窗口并跟踪时间流在应用程序中来自事件时间戳记.这意味着,如果没有事件,将没有时间流,因此现在将生成窗口.这就是您正在观察的.

This is the expected behavior, You are using EventTime, which means that the Watermarks used for closing windows and to track the time flow in the application come from event timestamps. This means that if there are no events, there will be not time flow and thus now windows will be generated. That is what You are observing.

您遇到的行为很可能来自您正在使用 AssignerWithPunctuatedWatermark 的事实,它会为每个事件发出时间戳和水印.如果您切换到 AssignerWithPeriodicWatermark ,即使没有数据,这也应生成水印,然后关闭&发出窗口.

The behavior You are experiencing comes most probably from the fact that You are using AssignerWithPunctuatedWatermark, which emits timestamps and watermarks per each event. If You switch to AssignerWithPeriodicWatermark this should generate the watermark even if no data is present, and close & emit the window.

这篇关于Apache Flink Tumbling Window延迟结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆