如何使用Flink SQL按事件时间对流进行排序 [英] How to sort a stream by event time using Flink SQL
问题描述
我有一个要排序的乱序DataStream<Event>
,以便按事件的时间时间戳对事件进行排序.我将用例简化到事件类只有一个字段-timestamp
字段:
I have an out-of-order DataStream<Event>
that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp
field:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
我收到此错误:
线程主"中的异常 org.apache.flink.table.api.SqlParserException:SQL解析失败. 在第1行第8列遇到"timestamp FROM".
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "timestamp FROM" at line 1, column 8.
似乎我没有以正确的方式指定事件时间属性,但尚不清楚出了什么问题.
Seems like I'm not specifying the event time attribute in the correct way, but it's not clear what's wrong.
推荐答案
该问题原来是在我的Event类中使用timestamp
作为字段名称.将其更改为eventTime
足以使所有工作正常:
The problem turned out to be using timestamp
as a field name in my Event class. Changing it to eventTime
was enough to get everything working:
public class Sort {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
public static class Event {
public Long eventTime;
Event() {
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
}
private static class OutOfOrderEventSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}
这篇关于如何使用Flink SQL按事件时间对流进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!