Flink : Rowtime 属性不能在常规连接的输入行中 [英] Flink : Rowtime attributes must not be in the input rows of a regular join

查看:55
本文介绍了Flink : Rowtime 属性不能在常规连接的输入行中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 flink SQL API,我想将多个表连接在一起并在时间窗口内进行一些计算.我有 3 个来自 CSV 文件的表,一个来自 Kafka.在 Kafka 表中,我有一个字段 timestampMs,我想用于我的时间窗口操作.

Using flink SQL API, I want to join multiple tables together and do some computation over time window. I have 3 table coming from CSV files, and one coming from Kafka. In the Kafka table, I have a field timestampMs, that I want to use for my time window operations.

为此,我做了以下代码:

For that I did the following code :

reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

TableSource table1 = CsvTableSource.builder()
        .path("path/to/file1.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id1", Types.STRING)
        .field("someInfo1", Types.FLOAT)
        .build();

TableSource table2 = CsvTableSource.builder()
        .path("path/to/file2.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("someInfo2", Types.STRING)
        .build();

TableSource table3 = CsvTableSource.builder()
        .path("path/to/file3.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("id1", Types.STRING)
        .field("someInfo3", Types.FLOAT)
        .build();

tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);


Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));

tableEnv.connect(new Kafka()
        .version("universal")
        .topic(MY_TOPIC)
        .properties(MY_PROPERTIES)
        .sinkPartitionerRoundRobin()
)
            .withFormat(...)
            .withSchema(schemaExt)
            .inAppendMode()
            .registerTableSource("KafkaInput");

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 " +
        "join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

int windowWidth = 5;
int frequency = 2;
Table processed = tableEnv.sqlQuery("SELECT id1 FROM Joined " +
        "GROUP BY id1, HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '30' SECOND)");



Sink s = createSink(this.esEndpoint, this.esPattern, this.schemaHandler.getSchemaStr());


tableEnv.registerTableSink("MySink", ...);

processed.insertInto("MySink");

env.execute();

但是当我运行它时,出现以下错误:

But when I run it, I have the following error :

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

但我不明白解决方法提示部分.加入我的表后,如何创建时间属性并进行一些窗口计算.

But I don't understand the workaround tip part. How can I create a time attribute and do some windowed computation after joining my tables.

--- 编辑---

在上面的代码中,我替换了以下几行:

In the above code, I replaced the following lines :

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 " +
        "join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

作者:

Table staticJoined = tableEnv.sqlQuery("SELECT *, TIMESTAMP('1970-01-01 00:00:00') as rowtime FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 ");

TemporalTableFunction temporalFunction = staticJoined.createTemporalTableFunction( "rowtime" , "id2");
tableEnv.registerFunction("CSVData", temporalFunction);

tableEnv.registerTable("Joined",
    tableEnv.sqlQuery("SELECT * FROM KafkaInput, LATERAL TABLE (CSVData(KafkaInput.rowtime)) as Statics WHERE Statics.id2 = KafkaInput.id2")
);

但是我在使用 TemporalTableFunction 时遇到错误:

But I get an error with the TemporalTableFunction :

Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:
set type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(3) rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
expression type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(0) NOT NULL rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
set is rel#26:LogicalCorrelate.NONE(left=HepRelVertex#24,right=HepRelVertex#25,correlation=$cor0,joinType=inner,requiredColumns={8})
expression is LogicalTemporalTableJoin#32

其中两个字段在设置类型"和表达式类型"之间不匹配.TIMESTAMP(3) rowtime0TIMESTAMP(0) NOT NULL rowtime0

Where two fields do not match between the 'set type' and the 'expression type'. TIMESTAMP(3) rowtime0 and TIMESTAMP(0) NOT NULL rowtime0

问题是我没有名为 rowtime0 的字段.它看起来像是一个内部字段.我真的不明白这里发生了什么

The problem is that I have no field named rowtime0. It look like it is an internal field. I don't really understand what's happening here

推荐答案

您的查询定义了常规连接,即没有时间连接约束的连接.由于 Flink 将所有表都视为动态的(即假设它们将来可能会发生变化),因此没有时间限制的常规连接不能保证(大致)按时间戳顺序发出行.但是,时间属性需要时间戳顺序,以确保可以在不完全实现流的情况下执行后续操作(例如窗口聚合)​​.因此,Flink 不允许时间属性作为不保留时间顺序的常规连接的输入(因此也是输出).

Your query defines regular joins, i.e., joins without a temporal join constraint. Since Flink treats all tables as dynamic (i.e., assumes that they might change in the future), a regular join without time constraints cannot guarantee that rows are emitted (roughly) in timestamp order. However, timestamp order is required for time attributes to ensure that subsequent operations (such as window aggregations) can be preformed without fully materializing the stream. Therefore, Flink does not allow time attributes as input (and hence also output) of a regular join that does not preserve the time order.

如果 Flink 知道 CSV 文件中的表是固定的而不是动态的,那么问题就不会存在.但是,目前尚不支持这种推理.

The problem would not exist, if Flink would be aware that the tables from the CSV files are fixed and not dynamic. However, this reasoning is not yet supported.

作为一种解决方法,您可以将 CSV 表建模为 临时表(不变)和 将它们加入与 Kafka 表.

As a workaround, you can model the CSV tables as temporal tables (that are not changing) and join them with the Kafka table.

这篇关于Flink : Rowtime 属性不能在常规连接的输入行中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆