Flink表获取类型信息 [英] Flink table get type information

查看:12
本文介绍了Flink表获取类型信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个Flink表CREATE TABLE source(id int, name string) with (...)和一个目标表CREATE TABLE destination(id int, unique_name string) with (...)unique_name使用内部Flink流程函数中的业务逻辑计算。

因此,我们可以安全地假设源模式将与目标模式完全相同(名称和数据类型)。 我做了一些低级的process使用数据流API来获取destination数据流。它有outputTypeASGenericType<org.apache.flink.types.Row>。当我再次将destination数据流转换回表时,出现以下错误。

org.apache.flink.table.api.ValidationException: Column types of query result and sink 
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.

Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema:  [id: INT, name: STRING]
虽然我可以使用下面的代码解决此问题,但是我希望泛化此问题并从目标Table获取RowTypeInformation。有没有办法从FlinkTable获取TypeInformation

tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))

推荐答案

表型系统比TypeInformation丰富。如果您不介意使用内部类,可以使用org.apache.flink.table.runtime.typeutils.ExternalTypeInfoTypeInformation可以使用表接口的DataType配置。

如果您喜欢使用官方支持的API。您可以用TypeInformation声明In和Out类型,在调用StreamTableEnvironment.toDataStream(..., DataType)时使用DataTypes.of(TypeInformation)

这篇关于Flink表获取类型信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆