将protobuf与flink一起使用 [英] Using protobuf with flink

查看:399
本文介绍了将protobuf与flink一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用flink从kafka读取数据并将其转换为protobuf.我面临的问题是,当我运行Java应用程序时,出现以下错误.如果将unknownFields变量名修改为其他名称,则可以使用,但是很难在所有protobuf类上进行此更改.

I'm using flink to read data from kafka and convert it to protobuf. The problem I'm facing is when I run the java application I get the below error. If I modify the unknownFields variable name to something else, it works but it's hard to make this change on all protobuf classes.

当我从kafka读取数据时,我也尝试过直接反序列化,但是我不确定getProducedType()方法返回的TypeInformation应该是什么.

I also tried to deserialize directly when reading from kafka but I'm not sure what should be the TypeInformation to be returned for getProducedType() method.

    public static class ProtoDeserializer implements DeserializationSchema{

    @Override
    public TypeInformation getProducedType() {
        // TODO Auto-generated method stub
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }

感谢所有帮助.谢谢.

java.lang.RuntimeException:受保护的字段com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields已包含在com.google.protobuf.GeneratedMessage类的层次结构中.请使用唯一的字段名称通过您的班级层次结构 在org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) 在org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) 在org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) 在org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) 在org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) 在org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) 在org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) 在org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) 在org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)

java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)

代码:

    FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);

    DataStream<byte[]> input = env.addSource(kafkaConsumer);
    DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
        @Override
        public PBAddress map(byte[] value) throws Exception {
            PBAddress addr = PBAddress.parseFrom(value);
            return addr;
        }
    });

推荐答案

https://issues.apache.org/jira/browse/FLINK-11333 是JIRA票据,用于跟踪对具有可演化模式的Protobuf类型实施一流支持的问题.您会在很久以前看到一个拉取请求,该请求尚未合并.我认为问题在于在以前通过向Kryo注册使用Protobuf的情况下,那里没有处理状态迁移的支持.

https://issues.apache.org/jira/browse/FLINK-11333 is the JIRA ticket tracking the issue of implementing first-class support for Protobuf types with evolvable schema. You'll see that there was a pull request quite some time ago, which hasn't been merged. I believe the problem was that there is no support there for handling state migration in cases where Protobuf was previously being used by registering it with Kryo.

与此同时,有状态功能项目(statefun是在Flink之上运行的新API)完全基于Protobuf,并且包括将Protobuf与Flink一起使用的支持:

Meanwhile, the Stateful Functions project (statefun is a new API that runs on top of Flink) is based entirely on Protobuf, and it includes support for using Protobuf with Flink: https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf. (The entry point to that package is ProtobufTypeInformation.java.) I suggest exploring this package (which includes nothing statefun specific); however, it doesn't concern itself with migrations from Kryo either.

这篇关于将protobuf与flink一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆