如何使用 Flink SQL 按事件时间对流进行排序 [英] How to sort a stream by event time using Flink SQL

查看:77
本文介绍了如何使用 Flink SQL 按事件时间对流进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个乱序的DataStream,我想对其进行排序,以便事件按其事件时间时间戳排序.我已经将我的用例简化为我的 Event 类只有一个字段——timestamp 字段:

I have an out-of-order DataStream<Event> that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp field:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

    sortedEventStream.print();

    env.execute();
}

我收到此错误:

线程main"中的异常org.apache.flink.table.api.SqlParserException:SQL 解析失败.在第 1 行第 8 列遇到timestamp FROM".

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "timestamp FROM" at line 1, column 8.

好像我没有以正确的方式指定事件时间属性,但不清楚出了什么问题.

Seems like I'm not specifying the event time attribute in the correct way, but it's not clear what's wrong.

推荐答案

问题原来是使用 timestamp 作为我的 Event 类中的字段名称.将其更改为 eventTime 足以让一切正常工作:

The problem turned out to be using timestamp as a field name in my Event class. Changing it to eventTime was enough to get everything working:

public class Sort {
    public static final int OUT_OF_ORDERNESS = 1000;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

        Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
        tableEnv.registerTable("events", events);
        Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
        DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

        sortedEventStream.print();

        env.execute();
    }

    public static class Event {
        public Long eventTime;

        Event() {
            this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
        }
    }

    private static class OutOfOrderEventSource implements SourceFunction<Event> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            while(running) {
                ctx.collect(new Event());
                Thread.sleep(1);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
        public TimestampsAndWatermarks() {
            super(Time.milliseconds(OUT_OF_ORDERNESS));
        }

        @Override
        public long extractTimestamp(Event event) {
            return event.eventTime;
        }
    }
}

这篇关于如何使用 Flink SQL 按事件时间对流进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆