Flink表获取类型信息 [英] Flink table get type information
本文介绍了Flink表获取类型信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
假设我有一个Flink表CREATE TABLE source(id int, name string) with (...)
和一个目标表CREATE TABLE destination(id int, unique_name string) with (...)
。unique_name
使用内部Flink流程函数中的业务逻辑计算。
因此,我们可以安全地假设源模式将与目标模式完全相同(名称和数据类型)。
我做了一些低级的process
使用数据流API来获取destination
数据流。它有outputType
ASGenericType<org.apache.flink.types.Row>
。当我再次将destination
数据流转换回表时,出现以下错误。
org.apache.flink.table.api.ValidationException: Column types of query result and sink
for registered table 'default_catalog.default_database.destination' do not match.
Cause: Different number of columns.
Query schema: [f0: RAW('org.apache.flink.types.Row', '...')]
Sink schema: [id: INT, name: STRING]
虽然我可以使用下面的代码解决此问题,但是我希望泛化此问题并从目标Table
获取RowTypeInformation
。有没有办法从FlinkTable
获取TypeInformation
。
tableEnv.fromDataStream(destionationDataStream.map(x -> x).returns(Types.ROW(Types.Int, Types.String))
推荐答案
表型系统比TypeInformation
丰富。如果您不介意使用内部类,可以使用org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
。TypeInformation
可以使用表接口的DataType
配置。
如果您喜欢使用官方支持的API。您可以用TypeInformation
声明In和Out类型,在调用StreamTableEnvironment.toDataStream(..., DataType)
时使用DataTypes.of(TypeInformation)
这篇关于Flink表获取类型信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文