创建具有超类型的 MapFunction 时编译失败 [英] Compiling fails when creating MapFunction with super type

查看:16
本文介绍了创建具有超类型的 MapFunction 时编译失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Flink 1.12.0,我有以下简单的测试用例:

I am using Flink 1.12.0, and I have following simple test case:

我定义了两个模型类(AbstractDataModel 是超类型,ConcreteModel 是子类型):

I defined two model class(AbstractDataModel is super type, while the ConcreteModel is the sub type):

public interface AbstractDataModel {
    public String getValue();
}

public class ConcreteModel implements AbstractDataModel {
    private String key;
    private String value;

    public ConcreteModel() {
    }

    public ConcreteModel(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

然后,我定义一个简单的应用如下,就是将ConcreteModel映射到字符串,

Then, I define a simple application as follows,which is to map the ConcreteModel to string,

MapFunction 正在使用超类型 AbstractDataModel,但有编译错误抱怨:

The MapFunction is using the super type AbstractDataModel, but there is compiling error complaining:

Required type:
MapFunction<com.ConcreteModel,java.lang.String>
Provided:
MyMapFunction

如果我仍然想使用 AbstractDataModel 作为 MapFunction 中的泛型类型,我会问如何解决这个问题

I would ask how to fix this problem if I still want to use AbstractDataModel as the generic type in the MapFunction

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

class MyMapFunction implements MapFunction<AbstractDataModel, String> {

    public String map(AbstractDataModel model) throws Exception {
        return model.getValue();
    }
}

public class ConcreteModelTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //        env.registerType(ConcreteModel.class);
        //        env.registerType(AbstractDataModel.class);
        //
        DataStream<String> ds = env.fromElements(new ConcreteModel("a", "1"), new ConcreteModel("b", "2")).map(new MyMapFunction());
        ds.print();
        env.execute();
    }
}

推荐答案

发生这种情况基本上是因为 Flink 由于其分布式环境而无法处理 POJO 对象.这是文档:

That happens basically because Flink cannot process POJO objects due to its distributed environment. Here is what says the docs:

15:45:51,460 信息 org.apache.flink.api.java.typeutils.TypeExtractor -类……不能用作 POJO 类型,因为并非所有字段都有效POJO 字段,必须作为 GenericType 处理.请阅读Flink 文档Data Types &序列化"的详细信息对性能的影响.

15:45:51,460 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class … cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

您可以使用 ResultTypeQueryable 并在方法 public TypeInformation getProducedType() 上使用 TypeInformation.of(AbstractDataModel.class) 定义返回类型.

You can use the ResultTypeQueryable and define the return type with TypeInformation.of(AbstractDataModel.class) on the method public TypeInformation getProducedType().

这个接口可以通过函数和输入格式来实现告诉框架他们产生的数据类型.这种方法作为以其他方式执行的反射分析的替代方案并且在生成的数据类型可能不同的情况下很有用取决于参数化.

This interface can be implemented by functions and input formats to tell the framework about their produced data type. This method acts as an alternative to the reflection analysis that is otherwise performed and is useful in situations where the produced data type may vary depending on parametrization.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

public class MyMapFunction implements MapFunction<AbstractDataModel, String>, 
    ResultTypeQueryable {

    @Override
    public String map(AbstractDataModel value) throws Exception {
        return value.getValue();
    }

    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(AbstractDataModel.class);
    }
}
public class ConcreteModelTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        AbstractDataModel concreteModel01 = new ConcreteModel("a", "1");
        AbstractDataModel concreteModel02 = new ConcreteModel("a", "2");
        DataStream<String> ds = env
                .fromElements(concreteModel01, concreteModel02)
                .map(new MyMapFunction());
        ds.print();
        env.execute();
    }
}

或者一个简单的方法是使用 TypeInformation.of(String.class) 调用 map.那么你就不需要在 MyMapFunction 处实现 ResultTypeQueryable.

or an easy way is to just call map with TypeInformation.of(String.class). Then you don't need to implement ResultTypeQueryable at MyMapFunction.

DataStream<String> ds = env
    .fromElements(concreteModel01, concreteModel02)
    .map(new MyMapFunction(), TypeInformation.of(String.class));

然后只需将您的接口与其类实现一起使用.

and then just use your interface with its class implementation.

public interface AbstractDataModel {
    public String getValue();
}
public class ConcreteModel implements AbstractDataModel {
    private String key;
    private String value;
    public ConcreteModel() {
    }
    public ConcreteModel(String key, String value) {
        this.key = key;
        this.value = value;
    }
    public String getKey() {
        return key;
    }
    public void setKey(String key) {
        this.key = key;
    }
    public String getValue() {
        return value;
    }
    public void setValue(String value) {
        this.value = value;
    }
}

这篇关于创建具有超类型的 MapFunction 时编译失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆