Apache Flink:如何启用"upsert模式"动态表? [英] Apache Flink: How to enable "upsert mode" for dynamic tables?

查看:1563
本文介绍了Apache Flink:如何启用"upsert模式"动态表?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Flink文档和官方Flink博客中,我已经多次提及基于动态键的"upsert模式" .但是,我没有看到有关如何在动态表上启用此模式的任何示例/文档.

I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.

示例:

通过更新模式在流上定义动态表时,我们可以在表上指定唯一键属性.在这种情况下,将对key属性执行更新和删除操作.下图显示了更新模式.

When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.

  • 文档:

    要转换为 upsert流的动态表,需要(可能是复合的)唯一键.

    A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.

  • 所以我的问题是:

    • 如何在Flink中的动态表上指定唯一键属性?
    • 如何将动态表置于更新/upsert/替换"模式,而不是追加模式?

    推荐答案

    链接的资源描述了两种不同的情况.

    The linked resources describe two different scenarios.

    • 博客文章讨论了一篇DataStream -> Table转换.
    • 文档描述逆向上插入Table -> DataStream转换.
    • The blog post discusses an upsert DataStream -> Table conversion.
    • The documentation describes the inverse upsert Table -> DataStream conversion.

    以下讨论基于Flink 1.4.0(2018年1月).

    The following discussion is based on Flink 1.4.0 (Jan. 2018).

    Upsert DataStream -> Table转换

    Upsert DataStream -> Table Conversion

    本地不支持通过在键上进行upsert将DataStream转换为Table.同时,您可以使用附加Table和带有用户定义的聚合函数的查询来模拟这种行为.

    Converting a DataStream into a Table by upsert on keys is not natively supported but on the roadmap. Meanwhile, you can emulate this behavior using an append Table and a query with a user-defined aggregation function.

    如果具有跟踪用户登录的模式(user, loginTime, ip)的附加Table Logins,则可以使用以下查询将其转换为键入user的upsert Table:

    If you have an append Table Logins with the schema (user, loginTime, ip) that tracks logins of users, you can convert that into an upsert Table keyed on user with the following query:

    SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user
    

    LAST_VAL聚合功能是用户定义的汇总函数,该函数始终返回最新的附加值.

    The LAST_VAL aggregation function is a user-defined aggregation function that always returns the latest added value.

    对upsert DataStream -> Table转换的本地支持将以相同的方式工作,尽管它提供了更为简洁的API.

    Native support for upsert DataStream -> Table conversion would work basically the same way, although providing a more concise API.

    更新Table -> DataStream转换

    Upsert Table -> DataStream Conversion

    不支持将Table转换为upsert DataStream.这也正确反映在文档中:

    Converting a Table into an upsert DataStream is not supported. This is also properly reflected in the documentation:

    请注意,将动态表转换为DataStream时仅支持追加和撤回流.

    Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.

    我们故意选择不支持upsert Table -> DataStream转换,因为仅当关键属性已知时才能处理upsert DataStream.这些取决于查询,并不总是一目了然.开发人员有责任确保正确解释了关键属性.否则可能会导致程序错误.为了避免出现问题,我们决定不提供upsert Table -> DataStream转换.

    We deliberately chose not to support upsert Table -> DataStream conversions, because an upsert DataStream can only be processed if the key attributes are known. These depend on the query and are not always straight-forward to identify. It would be the responsibility of the developer to make sure that the key attributes are correctly interpreted. Failing to do so would result in faulty programs. To avoid problems, we decided to not offer the upsert Table -> DataStream conversion.

    相反,用户可以将Table转换为撤消DataStream.而且,我们支持UpsertTableSink,它可以将upsert DataStream写入外部系统,例如数据库或键值存储.

    Instead users can convert a Table into a retraction DataStream. Moreover, we support UpsertTableSink that writes an upsert DataStream to an external system, such as a database or key-value store.

    这篇关于Apache Flink:如何启用"upsert模式"动态表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆