Flink Table API &SQL 和地图类型 (Scala) [英] Flink Table API & SQL and map types (Scala)

查看:27
本文介绍了Flink Table API &SQL 和地图类型 (Scala)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在流媒体环境中使用 Flink 的 Table API 和/或 Flink 的 SQL 支持(Flink 1.3.1、Scala 2.11).我从一个 DataStream[Person] 开始,而 Person 是一个看起来像这样的案例类:

I am using Flink's Table API and/or Flink's SQL support (Flink 1.3.1, Scala 2.11) in a streaming environment. I'm starting with a DataStream[Person], and Person is a case class that looks like:

Person(name: String, age: Int, attributes: Map[String, String])

一切都按预期进行,直到我开始将 attributes 带入图片中.

All is working as expected until I start to bring attributes into the picture.

例如:

val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)

...导致:

线程main"org.apache.flink.table.api.TableException 中的异常:不支持类型:ANY在 org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)在 org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531)在 org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:893)在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1336)在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72)在 scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54)在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)在 scala.collection.AbstractTraversable.map(Traversable.scala:104)在 org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530)在 org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)在 com.nordstrom.mdt.Job$.main(Job.scala:112)在 com.nordstrom.mdt.Job.main(Job.scala)

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalRelNode.(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job$.main(Job.scala:112) at com.nordstrom.mdt.Job.main(Job.scala)

注意:无论是否存在特定的映射键,都会发生此错误.另请注意,如果我根本指定映射键,则会得到一个不同的错误,这是有道理的;这种情况在这里不发挥作用.

Note: this error occurs whether or not the specific map key is present. Also note that if I do not specify a map key at all, I get a different error which makes sense; that scenario is not at play here.

这个 PR 似乎 说有一条前进的道路:https://github.com/apache/flink/pull/3767.专门查看 测试用例,它表明类型信息使用数据集是可能的.相关方法 fromDataStreamregisterDataStream 都没有提供提供类型信息的方法.

This PR seems to say that there's a path forward: https://github.com/apache/flink/pull/3767. Looking specifically at the test case, it suggests that type information is possible with DataSets. None of the relevant methods fromDataStream and registerDataStream offer a way to provide type information.

这可能吗?换句话说,Flink SQL on Streams 能支持映射吗?

Is this possible? In other words, can Flink SQL on Streams support maps?

澄清编辑...省略映射键(GROUP BY ... attributes 而不是 attributes['foo'])时,我收到以下错误.这表明运行时确实知道这些是字符串.

Clarifying edit... When omitting the map key (GROUP BY ... attributes rather than attributes['foo']), I get the error below. This indicates that the runtime does know that these are strings.

此类型(接口 scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)])不能用作键.

This type (interface scala.collection.immutable.Map[scala.Tuple2(_1: String, _2: String)]) cannot be used as key.

推荐答案

目前 Flink SQL 仅支持 Java java.util.Map.Scala 映射被视为具有 Flink GenericTypeInfo/SQL ANY 数据类型的黑盒.因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用 ['key'] 运算符进行访问.

Currently, Flink SQL supports only Java java.util.Map. Scala maps are treated as a blackbox with Flink GenericTypeInfo/SQL ANY data type. Therefore, you can forward these blackboxes and use them within scalar functions but accessing with the ['key'] operator is not supported.

所以要么使用 Java 映射,要么自己在 UDF 中实现访问操作.

So either you use a Java map or you implement the access operation yourself in a UDF.

我为您的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360

I created an issue for your problem: https://issues.apache.org/jira/browse/FLINK-7360

这篇关于Flink Table API &SQL 和地图类型 (Scala)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆