Flink SQL 是否支持 Java Map 类型? [英] Does Flink SQL support Java Map types?

查看:43
本文介绍了Flink SQL 是否支持 Java Map 类型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Flink 的 SQL API 从地图访问密钥.它因线程main"org.apache.flink.table.api.TableException 中的错误异常而失败:不支持类型:ANY请告知我如何修复它.这是我的活动课

I'm trying to access a key from a map using Flink's SQL API. It fails with the error Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY Please advise how i can fix it. Here is my event class

     public class EventHolder {

        private Map<String,String> event;

        public Map<String, String> getEvent() {
            return event;
        }

        public void setEvent(Map<String, String> event) {
            this.event = event;
        }
    }

这里是提交flink作业的主类

Here is the main class which submits the flink job

public class MapTableSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

        // register a table and use SQL
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("mapEvent", mapEventStream); 
        //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

        Table alerts = tableEnv.sql(
                "select event['key'] from mapEvent ");

        DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

        alertStream.filter(new FilterFunction<String>() {
            private static final long serialVersionUID = -2438621539037257735L;

            @Override
            public boolean filter(String value) throws Exception {
                System.out.println("Key value is:"+value);
                return value!=null;
            }
        });

        env.execute("map-tablsource-job");
    }

    private static List<EventHolder> getMaps(){
        List<EventHolder> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            EventHolder holder = new EventHolder();
            Map<String,String> map = new HashMap<>();
            map.put("key", "value");
            holder.setEvent(map);
            list.add(holder);
        }
        return list;
    }
}

运行时出现异常

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

我使用的是 flink 1.3.1

I'm using flink 1.3.1

推荐答案

我认为问题在于 fromCollection.由于 Java 限制(即类型擦除),Flink 无法提取所需的类型信息.因此,您的地图被视为具有 SQL ANY 类型的黑盒.您可以使用 tableEnv.scan("mapEvent").printSchema() 来验证表的类型.您可以使用 Types.MAP(Types.STRING, Types.STRING)fromCollection 中指定类型信息.

I think the problem lies in fromCollection. Flink is not able to extract the needed type information because of Java limitations (i.e. type erasure). Therefore you map is treated as black box with SQL ANY type. You can verify the types of your table by using tableEnv.scan("mapEvent").printSchema(). You can specify the type information in fromCollection with Types.MAP(Types.STRING, Types.STRING).

这篇关于Flink SQL 是否支持 Java Map 类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆