Apache Flink:如何启用“更新插入模式"对于动态表? [英] Apache Flink: How to enable "upsert mode" for dynamic tables?

查看:36
本文介绍了Apache Flink:如何启用“更新插入模式"对于动态表?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Flink 文档和 Flink 官方博客中看到多次提到基于唯一键的动态表更新插入模式".但是,我没有看到有关如何在动态表上启用此模式的任何示例/文档.

I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.

示例:

通过更新模式在流上定义动态表时,我们可以在表上指定唯一键属性.在这种情况下,对键属性执行更新和删除操作.更新模式如下图所示.

When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.

  • 文档:

    转换为更新插入流的动态表需要一个(可能是复合的)唯一键.

    A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.

  • 所以我的问题是:

    • 如何在 Flink 中为动态表指定唯一键属性?
    • 如何将动态表置于更新/更新插入/替换"模式,而不是追加模式?

    推荐答案

    更新:从 Flink 1.9 开始,LAST_VALUE内置聚合函数,如果我们使用 Blink planner(这是自 Flink 1.11 以来的默认设置).

    Update: since Flink 1.9, LAST_VALUE is part of the build-in aggregate functions, if we use the Blink planner (which is the default since Flink 1.11).

    假设上面 Fabian Hueske 的回复中提到的 Logins 表存在,我们现在可以简单地将其转换为 upsert 表:

    Assuming the existence of the Logins table mentioned in the response of Fabian Hueske above, we can now convert it to an upsert table as simply as:

    SELECT 
      user, 
      LAST_VALUE(loginTime), 
      LAST_VALUE(ip) 
    FROM Logins 
    GROUP BY user
    

    这篇关于Apache Flink:如何启用“更新插入模式"对于动态表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆