Apache Flink:如何启用"upsert模式"动态表? [英] Apache Flink: How to enable "upsert mode" for dynamic tables?
问题描述
在Flink文档和官方Flink博客中,我已经多次提及基于动态键的"upsert模式" .但是,我没有看到有关如何在动态表上启用此模式的任何示例/文档.
I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic table.
示例:
-
博客文章:
通过更新模式在流上定义动态表时,我们可以在表上指定唯一键属性.在这种情况下,将对key属性执行更新和删除操作.下图显示了更新模式.
When defining a dynamic table on a stream via update mode, we can specify a unique key attribute on the table. In that case, update and delete operations are performed with respect to the key attribute. The update mode is visualized in the following figure.
文档:
要转换为 upsert流的动态表,需要(可能是复合的)唯一键.
A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
所以我的问题是:
- 如何在Flink中的动态表上指定唯一键属性?
- 如何将动态表置于更新/upsert/替换"模式,而不是追加模式?
推荐答案
链接的资源描述了两种不同的情况.
The linked resources describe two different scenarios.
- The blog post discusses an upsert
DataStream -> Table
conversion. - The documentation describes the inverse upsert
Table -> DataStream
conversion.
以下讨论基于Flink 1.4.0(2018年1月).
The following discussion is based on Flink 1.4.0 (Jan. 2018).
Upsert DataStream -> Table
转换
Upsert DataStream -> Table
Conversion
本地不支持通过在键上进行upsert将DataStream
转换为Table
.同时,您可以使用附加Table
和带有用户定义的聚合函数的查询来模拟这种行为.
Converting a DataStream
into a Table
by upsert on keys is not natively supported but on the roadmap. Meanwhile, you can emulate this behavior using an append Table
and a query with a user-defined aggregation function.
如果具有跟踪用户登录的模式(user, loginTime, ip)
的附加Table
Logins
,则可以使用以下查询将其转换为键入user
的upsert Table
:
If you have an append Table
Logins
with the schema (user, loginTime, ip)
that tracks logins of users, you can convert that into an upsert Table
keyed on user
with the following query:
SELECT user, LAST_VAL(loginTime), LAST_VAL(ip) FROM Logins GROUP BY user
LAST_VAL
聚合功能是用户定义的汇总函数,该函数始终返回最新的附加值.
The LAST_VAL
aggregation function is a user-defined aggregation function that always returns the latest added value.
对upsert DataStream -> Table
转换的本地支持将以相同的方式工作,尽管它提供了更为简洁的API.
Native support for upsert DataStream -> Table
conversion would work basically the same way, although providing a more concise API.
更新Table -> DataStream
转换
Upsert Table -> DataStream
Conversion
不支持将Table
转换为upsert DataStream
.这也正确反映在文档中:
Converting a Table
into an upsert DataStream
is not supported. This is also properly reflected in the documentation:
请注意,将动态表转换为DataStream时仅支持追加和撤回流.
Please note that only append and retract streams are supported when converting a dynamic table into a DataStream.
我们故意选择不支持upsert Table -> DataStream
转换,因为仅当关键属性已知时才能处理upsert DataStream
.这些取决于查询,并不总是一目了然.开发人员有责任确保正确解释了关键属性.否则可能会导致程序错误.为了避免出现问题,我们决定不提供upsert Table -> DataStream
转换.
We deliberately chose not to support upsert Table -> DataStream
conversions, because an upsert DataStream
can only be processed if the key attributes are known. These depend on the query and are not always straight-forward to identify. It would be the responsibility of the developer to make sure that the key attributes are correctly interpreted. Failing to do so would result in faulty programs. To avoid problems, we decided to not offer the upsert Table -> DataStream
conversion.
相反,用户可以将Table
转换为撤消DataStream
.而且,我们支持UpsertTableSink
,它可以将upsert DataStream
写入外部系统,例如数据库或键值存储.
Instead users can convert a Table
into a retraction DataStream
. Moreover, we support UpsertTableSink
that writes an upsert DataStream
to an external system, such as a database or key-value store.
这篇关于Apache Flink:如何启用"upsert模式"动态表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!