如何使用 Flink SQL 按事件时间对流进行排序 [英] How to sort a stream by event time using Flink SQL
问题描述
我有一个乱序的DataStream
,我想对其进行排序,以便事件按其事件时间时间戳排序.我已经将我的用例简化为我的 Event 类只有一个字段——timestamp
字段:
I have an out-of-order DataStream<Event>
that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp
field:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
我收到此错误:
线程main"中的异常org.apache.flink.table.api.SqlParserException:SQL 解析失败.在第 1 行第 8 列遇到timestamp FROM".
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "timestamp FROM" at line 1, column 8.
好像我没有以正确的方式指定事件时间属性,但不清楚出了什么问题.
Seems like I'm not specifying the event time attribute in the correct way, but it's not clear what's wrong.
推荐答案
问题原来是使用 timestamp
作为我的 Event 类中的字段名称.将其更改为 eventTime
足以让一切正常工作:
The problem turned out to be using timestamp
as a field name in my Event class. Changing it to eventTime
was enough to get everything working:
public class Sort {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
public static class Event {
public Long eventTime;
Event() {
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
}
private static class OutOfOrderEventSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}
这篇关于如何使用 Flink SQL 按事件时间对流进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!