Flink Table API和SQL和映射类型(Scala) [英] Flink Table API & SQL and map types (Scala)

查看:221
本文介绍了Flink Table API和SQL和映射类型(Scala)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在环境中使用Flink的Table API和/或Flink的SQL支持(Flink 1.3.1,Scala 2.11).我从一个 DataStream [Person] 开始,而 Person 是一个case类,如下所示:

I am using Flink's Table API and/or Flink's SQL support (Flink 1.3.1, Scala 2.11) in a streaming environment. I'm starting with a DataStream[Person], and Person is a case class that looks like:

Person(name: String, age: Int, attributes: Map[String, String])

一切正常,直到我开始将属性带入图片.

All is working as expected until I start to bring attributes into the picture.

例如:

val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

...导致:

线程主"中的异常org.apache.flink.table.api.TableException:不支持类型:ANY在org.apache.flink.table.api.TableException $ .apply(exceptions.scala:53)在org.apache.flink.table.calcite.FlinkTypeFactory $ .toTypeInfo(FlinkTypeFactory.scala:341)在org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:531)在org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:530)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)在scala.collection.Iterator $ class.foreach(Iterator.scala:893)在scala.collection.AbstractIterator.foreach(Iterator.scala:1336)在scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)在scala.collection.AbstractIterable.foreach(Iterable.scala:54)在scala.collection.TraversableLike $ class.map(TraversableLike.scala:234)在scala.collection.AbstractTraversable.map(Traversable.scala:104)在org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530)在org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)在com.nordstrom.mdt.Job $ .main(Job.scala:112)在com.nordstrom.mdt.Job.main(Job.scala)

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)

注意:无论是否存在特定的映射密钥,都会发生此错误.还要注意,如果我根本不指定地图键,则会得到另一个有意义的错误;这种情况不在这里发挥作用.

Note: this error occurs whether or not the specific map key is present. Also note that if I do not specify a map key at all, I get a different error which makes sense; that scenario is not at play here.

此PR 似乎表示存在前进的道路: https://github.com/apache/flink/pull/3767 .专门查看测试用例,它表明类型信息使用数据集是可能的. fromDataStream registerDataStream 的相关方法均未提供提供类型信息的方法.

This PR seems to say that there's a path forward: https://github.com/apache/flink/pull/3767. Looking specifically at the test case, it suggests that type information is possible with DataSets. None of the relevant methods fromDataStream and registerDataStream offer a way to provide type information.

这可能吗?换句话说,Flink SQL on Streams是否可以支持地图?

Is this possible? In other words, can Flink SQL on Streams support maps?

正在澄清编辑...省略地图键( GROUP BY ...属性而不是 attributes ['foo'] )时,出现以下错误.这表明运行时确实知道这些是字符串.

Clarifying edit... When omitting the map key (GROUP BY ... attributes rather than attributes['foo']), I get the error below. This indicates that the runtime does know that these are strings.

这种类型(接口scala.collection.immutable.Map [scala.Tuple2(_1:String,_2:String)]]不能用作键.

This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.

推荐答案

当前,Flink SQL仅支持Java java.util.Map .Scala映射被视为具有Flink GenericTypeInfo /SQL ANY 数据类型的黑盒.因此,您可以转发这些黑盒,并在标量函数中使用它们,但不支持使用 ['key'] 运算符进行访问.

Currently, Flink SQL supports only Java java.util.Map. Scala maps are treated as a blackbox with Flink GenericTypeInfo/SQL ANY data type. Therefore, you can forward these blackboxes and use them within scalar functions but accessing with the ['key'] operator is not supported.

因此,您可以使用Java映射,也可以自己在UDF中实现访问操作.

So either you use a Java map or you implement the access operation yourself in a UDF.

我为您的问题创建了一个问题: https://issues.apache.org/jira/browse/FLINK-7360

I created an issue for your problem: https://issues.apache.org/jira/browse/FLINK-7360

这篇关于Flink Table API和SQL和映射类型(Scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆