Flink Table API和SQL和映射类型(Scala) [英] Flink Table API & SQL and map types (Scala)
问题描述
我在流环境中使用Flink的Table API和/或Flink的SQL支持(Flink 1.3.1,Scala 2.11).我从一个 DataStream [Person]
开始,而 Person
是一个case类,如下所示:
I am using Flink's Table API and/or Flink's SQL support (Flink 1.3.1, Scala 2.11) in a streaming environment. I'm starting with a DataStream[Person]
, and Person
is a case class that looks like:
Person(name: String, age: Int, attributes: Map[String, String])
一切正常,直到我开始将属性
带入图片.
All is working as expected until I start to bring attributes
into the picture.
例如:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
...导致:
线程主"中的异常org.apache.flink.table.api.TableException:不支持类型:ANY在org.apache.flink.table.api.TableException $ .apply(exceptions.scala:53)在org.apache.flink.table.calcite.FlinkTypeFactory $ .toTypeInfo(FlinkTypeFactory.scala:341)在org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:531)在org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:530)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)在scala.collection.Iterator $ class.foreach(Iterator.scala:893)在scala.collection.AbstractIterator.foreach(Iterator.scala:1336)在scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)在scala.collection.AbstractIterable.foreach(Iterable.scala:54)在scala.collection.TraversableLike $ class.map(TraversableLike.scala:234)在scala.collection.AbstractTraversable.map(Traversable.scala:104)在org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530)在org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)在com.nordstrom.mdt.Job $ .main(Job.scala:112)在com.nordstrom.mdt.Job.main(Job.scala)
Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)
注意:无论是否存在特定的映射密钥,都会发生此错误.还要注意,如果我根本不指定地图键,则会得到另一个有意义的错误;这种情况不在这里发挥作用.
Note: this error occurs whether or not the specific map key is present. Also note that if I do not specify a map key at all, I get a different error which makes sense; that scenario is not at play here.
此PR 似乎表示存在前进的道路: https://github.com/apache/flink/pull/3767 .专门查看测试用例,它表明类型信息使用数据集是可能的. fromDataStream
和 registerDataStream
的相关方法均未提供提供类型信息的方法.
This PR seems to say that there's a path forward: https://github.com/apache/flink/pull/3767. Looking specifically at the test case, it suggests that type information is possible with DataSets. None of the relevant methods fromDataStream
and registerDataStream
offer a way to provide type information.
这可能吗?换句话说,Flink SQL on Streams是否可以支持地图?
Is this possible? In other words, can Flink SQL on Streams support maps?
正在澄清编辑...当省略地图键( GROUP BY ...属性
而不是 attributes ['foo']
)时,出现以下错误.这表明运行时确实知道这些是字符串.
Clarifying edit...
When omitting the map key (GROUP BY ... attributes
rather than attributes['foo']
), I get the error below. This indicates that the runtime does know that these are strings.
这种类型(接口scala.collection.immutable.Map [scala.Tuple2(_1:String,_2:String)]]不能用作键.
This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.
推荐答案
当前,Flink SQL仅支持Java java.util.Map
.Scala映射被视为具有Flink GenericTypeInfo
/SQL ANY
数据类型的黑盒.因此,您可以转发这些黑盒,并在标量函数中使用它们,但不支持使用 ['key']
运算符进行访问.
Currently, Flink SQL supports only Java java.util.Map
. Scala maps are treated as a blackbox with Flink GenericTypeInfo
/SQL ANY
data type. Therefore, you can forward these blackboxes and use them within scalar functions but accessing with the ['key']
operator is not supported.
因此,您可以使用Java映射,也可以自己在UDF中实现访问操作.
So either you use a Java map or you implement the access operation yourself in a UDF.
我为您的问题创建了一个问题: https://issues.apache.org/jira/browse/FLINK-7360
I created an issue for your problem: https://issues.apache.org/jira/browse/FLINK-7360
这篇关于Flink Table API和SQL和映射类型(Scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!