Flink:Rowtime属性一定不能在常规联接的输入行中 [英] Flink : Rowtime attributes must not be in the input rows of a regular join

查看:2494
本文介绍了Flink:Rowtime属性一定不能在常规联接的输入行中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用flink SQL API,将多个表连接在一起,并在一段时间内进行一些计算. 我有3个来自CSV文件的表,以及1个来自Kafka的表. 在Kafka表中,我有一个要用于时间窗口操作的字段timestampMs.

Using flink SQL API, I want to join multiple tables together and do some computation over time window. I have 3 table coming from CSV files, and one coming from Kafka. In the Kafka table, I have a field timestampMs, that I want to use for my time window operations.

为此,我执行了以下代码:

For that I did the following code :

reamExecutionEnvironment env = ... ;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

TableSource table1 = CsvTableSource.builder()
        .path("path/to/file1.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id1", Types.STRING)
        .field("someInfo1", Types.FLOAT)
        .build();

TableSource table2 = CsvTableSource.builder()
        .path("path/to/file2.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("someInfo2", Types.STRING)
        .build();

TableSource table3 = CsvTableSource.builder()
        .path("path/to/file3.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .field("id2", Types.STRING)
        .field("id1", Types.STRING)
        .field("someInfo3", Types.FLOAT)
        .build();

tableEnv.registerTableSource("Table1",table1);
tableEnv.registerTableSource("Table2",table2);
tableEnv.registerTableSource("Table3",table3);


Schema schemaExt = new Schema().schema(SOME_SCHEMA);
schemaExt = schemaExt.field("rowtime", Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("timestampMs").watermarksPeriodicBounded(40000));

tableEnv.connect(new Kafka()
        .version("universal")
        .topic(MY_TOPIC)
        .properties(MY_PROPERTIES)
        .sinkPartitionerRoundRobin()
)
            .withFormat(...)
            .withSchema(schemaExt)
            .inAppendMode()
            .registerTableSource("KafkaInput");

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 " +
        "join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

int windowWidth = 5;
int frequency = 2;
Table processed = tableEnv.sqlQuery("SELECT id1 FROM Joined " +
        "GROUP BY id1, HOP(rowtime, INTERVAL '10' SECOND, INTERVAL '30' SECOND)");



Sink s = createSink(this.esEndpoint, this.esPattern, this.schemaHandler.getSchemaStr());


tableEnv.registerTableSink("MySink", ...);

processed.insertInto("MySink");

env.execute();

但是当我运行它时,出现以下错误:

But when I run it, I have the following error :

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

但是我不了解解决方法的提示部分.加入表后,如何创建时间属性并进行一些窗口计算.

But I don't understand the workaround tip part. How can I create a time attribute and do some windowed computation after joining my tables.

-编辑--

在上面的代码中,我替换了以下几行:

In the above code, I replaced the following lines :

Table joined = tableEnv.sqlQuery("SELECT * FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 " +
        "join KafkaInput on table3.id2 = KafkaInput.id2");

tableEnv.registerTable("Joined", joined);

通过:

Table staticJoined = tableEnv.sqlQuery("SELECT *, TIMESTAMP('1970-01-01 00:00:00') as rowtime FROM table1 " +
        "join table3 on table1.id2 = table3.id2 " +
        "join table2 on table3.id1 = table2.id1 ");

TemporalTableFunction temporalFunction = staticJoined.createTemporalTableFunction( "rowtime" , "id2");
tableEnv.registerFunction("CSVData", temporalFunction);

tableEnv.registerTable("Joined",
    tableEnv.sqlQuery("SELECT * FROM KafkaInput, LATERAL TABLE (CSVData(KafkaInput.rowtime)) as Statics WHERE Statics.id2 = KafkaInput.id2")
);

但是我收到TemporalTableFunction错误:

But I get an error with the TemporalTableFunction :

Exception in thread "main" java.lang.AssertionError: Cannot add expression of different type to set:
set type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(3) rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
expression type is RecordType(BIGINT genTimestampMs, BIGINT timestampMs, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkConfId, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" sdkId, FLOAT density, FLOAT count, FLOAT surface, TIMESTAMP(3) NOT NULL rowtime, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" cameraName, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" streamId00, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId, FLOAT coefficient, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId0, FLOAT thresholdLow, FLOAT thresholdMedium, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" areaId1, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" name, TIMESTAMP(0) NOT NULL rowtime0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" StationName) NOT NULL
set is rel#26:LogicalCorrelate.NONE(left=HepRelVertex#24,right=HepRelVertex#25,correlation=$cor0,joinType=inner,requiredColumns={8})
expression is LogicalTemporalTableJoin#32

设置类型"和表达式类型"之间两个字段不匹配的地方. TIMESTAMP(3) rowtime0TIMESTAMP(0) NOT NULL rowtime0

Where two fields do not match between the 'set type' and the 'expression type'. TIMESTAMP(3) rowtime0 and TIMESTAMP(0) NOT NULL rowtime0

问题是我没有名为rowtime0的字段.看起来它是一个内部字段.我真的不明白这里发生了什么

The problem is that I have no field named rowtime0. It look like it is an internal field. I don't really understand what's happening here

推荐答案

您的查询定义了常规联接,即没有临时联接约束的联接.由于Flink将所有表视为动态表(即假设它们将来可能会更改),因此没有时间限制的常规联接无法保证按时间戳顺序发出(大致)行.但是,时间属性需要时间戳顺序,以确保可以在不完全实现流的情况下执行后续操作(例如窗口聚合)​​.因此,Flink不允许时间属性作为不保留时间顺序的常规联接的输入(因此也可以作为输出).

Your query defines regular joins, i.e., joins without a temporal join constraint. Since Flink treats all tables as dynamic (i.e., assumes that they might change in the future), a regular join without time constraints cannot guarantee that rows are emitted (roughly) in timestamp order. However, timestamp order is required for time attributes to ensure that subsequent operations (such as window aggregations) can be preformed without fully materializing the stream. Therefore, Flink does not allow time attributes as input (and hence also output) of a regular join that does not preserve the time order.

如果Flink知道CSV文件中的表是固定的而不是动态的,则该问题将不存在.但是,尚不支持这种推理.

The problem would not exist, if Flink would be aware that the tables from the CSV files are fixed and not dynamic. However, this reasoning is not yet supported.

作为一种解决方法,您可以将CSV表建模为临时表(未更改)和

As a workaround, you can model the CSV tables as temporal tables (that are not changing) and join them with the Kafka table.

这篇关于Flink:Rowtime属性一定不能在常规联接的输入行中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆