将protobuf与flink一起使用 [英] Using protobuf with flink
问题描述
我正在使用flink从kafka读取数据并将其转换为protobuf.我面临的问题是,当我运行Java应用程序时,出现以下错误.如果将unknownFields
变量名修改为其他名称,则可以使用,但是很难在所有protobuf类上进行此更改.
I'm using flink to read data from kafka and convert it to protobuf. The problem I'm facing is when I run the java application I get the below error. If I modify the unknownFields
variable name to something else, it works but it's hard to make this change on all protobuf classes.
当我从kafka读取数据时,我也尝试过直接反序列化,但是我不确定getProducedType()
方法返回的TypeInformation
应该是什么.
I also tried to deserialize directly when reading from kafka but I'm not sure what should be the TypeInformation
to be returned for getProducedType()
method.
public static class ProtoDeserializer implements DeserializationSchema{
@Override
public TypeInformation getProducedType() {
// TODO Auto-generated method stub
return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
}
感谢所有帮助.谢谢.
java.lang.RuntimeException:受保护的字段com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields已包含在com.google.protobuf.GeneratedMessage类的层次结构中.请使用唯一的字段名称通过您的班级层次结构 在org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) 在org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) 在org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) 在org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) 在org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) 在org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) 在org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) 在org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) 在org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)
java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)
代码:
FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);
DataStream<byte[]> input = env.addSource(kafkaConsumer);
DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
@Override
public PBAddress map(byte[] value) throws Exception {
PBAddress addr = PBAddress.parseFrom(value);
return addr;
}
});
推荐答案
https://issues.apache.org/jira/browse/FLINK-11333 是JIRA票据,用于跟踪对具有可演化模式的Protobuf类型实施一流支持的问题.您会在很久以前看到一个拉取请求,该请求尚未合并.我认为问题在于在以前通过向Kryo注册使用Protobuf的情况下,那里没有处理状态迁移的支持.
https://issues.apache.org/jira/browse/FLINK-11333 is the JIRA ticket tracking the issue of implementing first-class support for Protobuf types with evolvable schema. You'll see that there was a pull request quite some time ago, which hasn't been merged. I believe the problem was that there is no support there for handling state migration in cases where Protobuf was previously being used by registering it with Kryo.
与此同时,有状态功能项目(statefun是在Flink之上运行的新API)完全基于Protobuf,并且包括将Protobuf与Flink一起使用的支持:
Meanwhile, the Stateful Functions project (statefun is a new API that runs on top of Flink) is based entirely on Protobuf, and it includes support for using Protobuf with Flink: https://github.com/apache/flink-statefun/tree/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf. (The entry point to that package is ProtobufTypeInformation.java.) I suggest exploring this package (which includes nothing statefun specific); however, it doesn't concern itself with migrations from Kryo either.
这篇关于将protobuf与flink一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!