在 flink 中使用 protobuf [英] Using protobuf with flink

查看:111
本文介绍了在 flink 中使用 protobuf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 flink 从 kafka 读取数据并将其转换为 protobuf.我面临的问题是,当我运行 java 应用程序时,出现以下错误.如果我将 unknownFields 变量名称修改为其他名称,它会起作用,但很难在所有 protobuf 类上进行此更改.

I'm using flink to read data from kafka and convert it to protobuf. The problem I'm facing is when I run the java application I get the below error. If I modify the unknownFields variable name to something else, it works but it's hard to make this change on all protobuf classes.

我还尝试在从 kafka 读取时直接反序列化,但我不确定 getProducedType() 方法返回的 TypeInformation 应该是什么.

I also tried to deserialize directly when reading from kafka but I'm not sure what should be the TypeInformation to be returned for getProducedType() method.

    public static class ProtoDeserializer implements DeserializationSchema{

    @Override
    public TypeInformation getProducedType() {
        // TODO Auto-generated method stub
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }

感谢所有帮助.谢谢.

java.lang.RuntimeException:com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields 字段已包含在 com.google.protobuf.GeneratedMessage 类的层次结构中.请使用唯一的字段名称通过您的类层次结构在 org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594)在 org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515)在 org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412)在 org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319)在 org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609)在 org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437)在 org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306)在 org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133)在 org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)

java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)

代码:

    FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);

    DataStream<byte[]> input = env.addSource(kafkaConsumer);
    DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
        @Override
        public PBAddress map(byte[] value) throws Exception {
            PBAddress addr = PBAddress.parseFrom(value);
            return addr;
        }
    });

推荐答案

https://issues.apache.org/jira/browse/FLINK-11333 是 JIRA 票证,用于跟踪对具有可演化模式的 Protobuf 类型实现一流支持的问题.你会看到很久以前有一个拉取请求,它没有被合并.我认为问题在于,如果之前通过向 Kryo 注册来使用 Protobuf,则不支持处理状态迁移.

https://issues.apache.org/jira/browse/FLINK-11333 is the JIRA ticket tracking the issue of implementing first-class support for Protobuf types with evolvable schema. You'll see that there was a pull request quite some time ago, which hasn't been merged. I believe the problem was that there is no support there for handling state migration in cases where Protobuf was previously being used by registering it with Kryo.

与此同时,Stateful Functions 项目(statefun 是一个运行在 Flink 之上的新 API)完全基于 Protobuf,并且支持将 Protobuf 与 Flink 结合使用:https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf.(该包的入口点是 ProtobufTypeInformation.java.)我建议探索这个包(它不包含任何特定于 statefun 的内容);然而,它也不关心来自 Kryo 的迁移.

Meanwhile, the Stateful Functions project (statefun is a new API that runs on top of Flink) is based entirely on Protobuf, and it includes support for using Protobuf with Flink: https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf. (The entry point to that package is ProtobufTypeInformation.java.) I suggest exploring this package (which includes nothing statefun specific); however, it doesn't concern itself with migrations from Kryo either.

这篇关于在 flink 中使用 protobuf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆