“行时间属性一定不能在常规联接的输入行中"尽管使用了间隔连接,但仅具有事件时间戳 [英] "Rowtime attributes must not be in the input rows of a regular join" despite using interval join, but only with event timestamp

查看:398
本文介绍了“行时间属性一定不能在常规联接的输入行中"尽管使用了间隔连接,但仅具有事件时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

示例代码:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment


env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql(
    """
    CREATE TABLE table1 (
        id INT,
        ts TIMESTAMP(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data1.csv'
    )
"""
)


table_env.execute_sql(
    """
    CREATE TABLE table2 (
        id2 INT,
        ts2 TIMESTAMP(3),
        WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
    ) WITH (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/home/alex/work/test-flink/data2.csv'
    )
"""
)

table1 = table_env.from_path("table1")
table2 = table_env.from_path("table2")

print(table1.join(table2).where("ts = ts2 && id = id2").select("id, ts").to_pandas())

给出错误:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalLegacySink(name=[collect], fields=[id, ts])
+- FlinkLogicalCalc(select=[id, ts])
   +- FlinkLogicalJoin(condition=[AND(=($2, $5), =($0, $3))], joinType=[inner])
      :- FlinkLogicalCalc(select=[id, ts, CAST(ts) AS ts0])
      :  +- FlinkLogicalWatermarkAssigner(rowtime=[ts], watermark=[-($1, 5000:INTERVAL SECOND)])
      :     +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table1, source: [CsvTableSource(read fields: id, ts)]]], fields=[id, ts])
      +- FlinkLogicalCalc(select=[id2, ts2, CAST(ts2) AS ts20])
         +- FlinkLogicalWatermarkAssigner(rowtime=[ts2], watermark=[-($1, 5000:INTERVAL SECOND)])
            +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table2, source: [CsvTableSource(read fields: id2, ts2)]]], fields=[id2, ts2])

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

这似乎与其他类似问题不同,例如

This seems different from other similar questions such as this one because I have followed the instructions in the docs and specified both an equi-join and a time interval join (ts = ts2 && id = id2):

间隔联接至少需要一个等联接谓词和一个联接 条件限制了双方的时间.这样的条件可以是 由两个适当的范围谓词(<,< =,> =,>)或单个 等式谓词,用于比较相同类型的时间属性 (即处理时间或事件时间)两个输入表.

An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

例如,以下谓词是有效的间隔联接 条件:

For example, the following predicates are valid interval join conditions:

  • ltime = rtime

如果问题在于这些表不是仅用于追加的表,那么我不知道如何使它们如此.

If the problem is that these are not append-only tables, I don't know how to make them so.

设置时间特征没有帮助:

Setting the time characteristic doesn't help:

StreamExecutionEnvironment.get_execution_environment().set_stream_time_characteristic(
    TimeCharacteristic.EventTime
)

如果我将处理时间改为使用ts AS PROCTIME(),则查询成功.但是我认为我需要使用事件时间,而且我不明白为什么会有这种差异.

If I use processing time instead with ts AS PROCTIME() then the query succeeds. But I think I need to use event time and I don't understand why there's this difference.

推荐答案

在SQL中,两个常规表之间的联接始终使用FROM a, ba JOIN b以相同的方式表示.

Joins between two regular tables in SQL are always expressed in the same way using FROM a, b or a JOIN b.

但是,Flink在幕后为相同的语法提供了两种类型的联接运算符.一个是 interval join ,它需要时间属性才能根据时间将两个表相互关联.一个是常规SQL连接,它是通过数据库中的通用方式实现的.

However, Flink provides two types of join operators under the hood for the same syntax. One is an interval join which requires time attributes to relate both tables with each other based on time. And one is the regular SQL join that is implemented in a generic way as you know it from databases.

间隔连接只是一种流优化,可以在运行时将状态大小保持为较小,并且不会在结果中产生任何更新.常规的SQL连接运算符最终可以产生与间隔相同的结果,但是维护成本较高.

Interval joins are just a streaming optimization to keep the state size low during runtime and produce no updates in the result. The regular SQL join operator can produce the same result as the an interval in the end but with higher maintenance costs.

为了区分时间间隔联接和常规联接,优化器在WHERE子句中搜索对时间属性起作用的谓词.对于间隔连接,输出始终可以包含两个外部时间操作(下游时间运算符)的行时间属性.因为这两个行时间属性仍与基础水印系统对齐.这意味着例如外窗或其他间隔联接可以再次使用time属性.

In order to distinguish between interval join and regular join, the optimizer searches for a predicate in the WHERE clause that works on time attributes. For the interval join, the output can always contain two rowtime attributes for outer temporal operations (downstream temporal operators). Because both rowtime attributes are still aligned with the underlying watermarking system. This means that e.g. an outer window or other interval join could work with the time attribute again.

但是,间隔联接的实现存在一些缺点,这些缺点已在 FLINK中体现并涵盖-10211 .由于设计不良,我们无法在某些位置区分间隔联接和常规联接.因此,我们需要假设常规联接可以是间隔联接,并且不能为用户自动将时间属性强制转换为TIMESTAMP.相反,我们目前在常规联接的输出中禁止使用时间属性.

However, the implementation of interval joins has some shortcomings that are known and covered in FLINK-10211. Due to the bad design, we cannot distinguish between an interval join and regular join at certain locations. Thus, we need to assume that the regular join could be an interval join and cannot cast the time attribute to TIMESTAMP for users automatically. Instead we currently forbid time attributes in the output for regular joins.

在某个时候,这种限制有望消失,直到那时用户有两种可能性:

At some point this limitation will hopefully be gone, until then a user has two possibilities:

  1. 不要在包含时间属性的表上使用常规联接.您也可以使用嵌套的SELECT子句将其投影掉,或者在加入之前进行CAST

  1. Don't use a regular join on tables that contain a time attribute. You can also just project it away with a nested SELECT clause or do a CAST before joining.

使用SELECT子句中的CAST(col AS TIMESTAMP)将时间属性设置为常规时间戳.它将被下推到加入操作中.

Cast the time attribute to a regular timestamp using CAST(col AS TIMESTAMP) in the SELECT clause. It will be pushed down into the join operation.

您的异常表明您正在使用常规联接.间隔联接需要一定的范围才能运行(即使只有1 ms).他们不支持平等.

Your exception indicates that you are using a regular join. Interval joins need a range to operate (even if it is only 1 ms). They don't support equality.

这篇关于“行时间属性一定不能在常规联接的输入行中"尽管使用了间隔连接,但仅具有事件时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆