Flink Table API &SQL 和地图类型 (Scala) [英] Flink Table API & SQL and map types (Scala)
问题描述
我在流媒体环境中使用 Flink 的 Table API 和/或 Flink 的 SQL 支持(Flink 1.3.1、Scala 2.11).我从一个 DataStream[Person]
开始,而 Person
是一个看起来像这样的案例类:
I am using Flink's Table API and/or Flink's SQL support (Flink 1.3.1, Scala 2.11) in a streaming environment. I'm starting with a DataStream[Person]
, and Person
is a case class that looks like:
Person(name: String, age: Int, attributes: Map[String, String])
一切都按预期进行,直到我开始将 attributes
带入图片中.
All is working as expected until I start to bring attributes
into the picture.
例如:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
...导致:
线程main"org.apache.flink.table.api.TableException 中的异常:不支持类型:ANY在 org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)在 org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531)在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)在 scala.collection.Iterator$class.foreach(Iterator.scala:893)在 scala.collection.AbstractIterator.foreach(Iterator.scala:1336)在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)在 scala.collection.AbstractTraversable.map(Traversable.scala:104)在 org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530)在 org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)在 com.nordstrom.mdt.Job$.main(Job.scala:112)在 com.nordstrom.mdt.Job.main(Job.scala)
Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)
注意:无论是否存在特定的映射键,都会发生此错误.另请注意,如果我根本不指定映射键,则会得到一个不同的错误,这是有道理的;这种情况在这里不发挥作用.
Note: this error occurs whether or not the specific map key is present. Also note that if I do not specify a map key at all, I get a different error which makes sense; that scenario is not at play here.
这个 PR 似乎 说有一条前进的道路:https://github.com/apache/flink/pull/3767.专门查看 测试用例,它表明类型信息使用数据集是可能的.相关方法 fromDataStream
和 registerDataStream
都没有提供提供类型信息的方法.
This PR seems to say that there's a path forward: https://github.com/apache/flink/pull/3767. Looking specifically at the test case, it suggests that type information is possible with DataSets. None of the relevant methods fromDataStream
and registerDataStream
offer a way to provide type information.
这可能吗?换句话说,Flink SQL on Streams 能支持映射吗?
Is this possible? In other words, can Flink SQL on Streams support maps?
澄清编辑...当省略映射键(GROUP BY ... attributes
而不是 attributes['foo']
)时,我收到以下错误.这表明运行时确实知道这些是字符串.
Clarifying edit...
When omitting the map key (GROUP BY ... attributes
rather than attributes['foo']
), I get the error below. This indicates that the runtime does know that these are strings.
此类型(接口 scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)])不能用作键.
This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.
推荐答案
目前 Flink SQL 仅支持 Java java.util.Map
.Scala 映射被视为具有 Flink GenericTypeInfo
/SQL ANY
数据类型的黑盒.因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用 ['key']
运算符进行访问.
Currently, Flink SQL supports only Java java.util.Map
. Scala maps are treated as a blackbox with Flink GenericTypeInfo
/SQL ANY
data type. Therefore, you can forward these blackboxes and use them within scalar functions but accessing with the ['key']
operator is not supported.
所以要么使用 Java 映射,要么自己在 UDF 中实现访问操作.
So either you use a Java map or you implement the access operation yourself in a UDF.
我为您的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360
I created an issue for your problem: https://issues.apache.org/jira/browse/FLINK-7360
这篇关于Flink Table API &SQL 和地图类型 (Scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!