Flink SQL是否支持Java Map类型? [英] Does Flink SQL support Java Map types?

查看:946
本文介绍了Flink SQL是否支持Java Map类型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Flink的SQL API从地图访问键.它失败,并在线程"main"中出现错误Exception org.apache.flink.table.api.TableException:不支持类型:ANY 请告知我该如何解决. 这是我的活动课

I'm trying to access a key from a map using Flink's SQL API. It fails with the error Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY Please advise how i can fix it. Here is my event class

     public class EventHolder {

        private Map<String,String> event;

        public Map<String, String> getEvent() {
            return event;
        }

        public void setEvent(Map<String, String> event) {
            this.event = event;
        }
    }

这里是提交flink作业的主要班级

Here is the main class which submits the flink job

public class MapTableSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

        // register a table and use SQL
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("mapEvent", mapEventStream); 
        //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

        Table alerts = tableEnv.sql(
                "select event['key'] from mapEvent ");

        DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

        alertStream.filter(new FilterFunction<String>() {
            private static final long serialVersionUID = -2438621539037257735L;

            @Override
            public boolean filter(String value) throws Exception {
                System.out.println("Key value is:"+value);
                return value!=null;
            }
        });

        env.execute("map-tablsource-job");
    }

    private static List<EventHolder> getMaps(){
        List<EventHolder> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            EventHolder holder = new EventHolder();
            Map<String,String> map = new HashMap<>();
            map.put("key", "value");
            holder.setEvent(map);
            list.add(holder);
        }
        return list;
    }
}

当我运行它时,我得到了异常

When I run it I'm getting the exception

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

我正在使用flink 1.3.1

I'm using flink 1.3.1

推荐答案

我认为问题出在fromCollection.由于Java限制(即类型擦除),Flink无法提取所需的类型信息.因此,您的映射被视为SQL ANY类型的黑匣子.您可以使用tableEnv.scan("mapEvent").printSchema()验证表的类型.您可以在fromCollectionTypes.MAP(Types.STRING, Types.STRING)中指定类型信息.

I think the problem lies in fromCollection. Flink is not able to extract the needed type information because of Java limitations (i.e. type erasure). Therefore you map is treated as black box with SQL ANY type. You can verify the types of your table by using tableEnv.scan("mapEvent").printSchema(). You can specify the type information in fromCollection with Types.MAP(Types.STRING, Types.STRING).

这篇关于Flink SQL是否支持Java Map类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆