如何使用Flink SQL按事件时间对流进行排序 [英] How to sort a stream by event time using Flink SQL

查看:1801
本文介绍了如何使用Flink SQL按事件时间对流进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要排序的乱序DataStream<Event>,以便按事件的时间时间戳对事件进行排序.我将用例简化到事件类只有一个字段-timestamp字段:

I have an out-of-order DataStream<Event> that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp field:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

    sortedEventStream.print();

    env.execute();
}

我收到此错误:

线程主"中的异常 org.apache.flink.table.api.SqlParserException:SQL解析失败. 在第1行第8列遇到"timestamp FROM".

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "timestamp FROM" at line 1, column 8.

似乎我没有以正确的方式指定事件时间属性,但尚不清楚出了什么问题.

Seems like I'm not specifying the event time attribute in the correct way, but it's not clear what's wrong.

推荐答案

该问题原来是在我的Event类中使用timestamp作为字段名称.将其更改为eventTime足以使所有工作正常:

The problem turned out to be using timestamp as a field name in my Event class. Changing it to eventTime was enough to get everything working:

public class Sort {
    public static final int OUT_OF_ORDERNESS = 1000;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

        Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
        tableEnv.registerTable("events", events);
        Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
        DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

        sortedEventStream.print();

        env.execute();
    }

    public static class Event {
        public Long eventTime;

        Event() {
            this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
        }
    }

    private static class OutOfOrderEventSource implements SourceFunction<Event> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            while(running) {
                ctx.collect(new Event());
                Thread.sleep(1);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
        public TimestampsAndWatermarks() {
            super(Time.milliseconds(OUT_OF_ORDERNESS));
        }

        @Override
        public long extractTimestamp(Event event) {
            return event.eventTime;
        }
    }
}

这篇关于如何使用Flink SQL按事件时间对流进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆