Flink Kafka - 自定义类数据始终为空 [英] Flink Kafka - Custom Class Data is always null
本文介绍了Flink Kafka - 自定义类数据始终为空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
自定义类
人
class Person
{
private Integer id;
private String name;
//getters and setters
}
Kafka Flink 连接器
TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));
现在如果我发送以下 json
Now if I send the below json
{ "id" : 1, "name": Synd }
通过Kafka Console Producer,flink代码抛出空指针异常但是,如果我使用 SimpleStringSchema
而不是之前定义的 CustomSchema,则会打印流.
through Kafka Console Producer, the flink code throws null pointer exception
But if I use SimpleStringSchema
instead of CustomSchema as defined before, the stream is getting printed.
上面的设置有什么问题
推荐答案
有相同问题的回答
自定义序列化程序
class PersonSchema implements DeserializationSchema<Person>{
private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;
@Override
public Person deserialize(byte[] bytes) throws IOException {
return mapper.readValue( bytes, Person.class );
}
@Override
public boolean isEndOfStream(Person person) {
return false;
}
@Override
public TypeInformation<Person> getProducedType() {
return TypeInformation.of(new TypeHint<Person>(){});
}
}
使用架构
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));
这篇关于Flink Kafka - 自定义类数据始终为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文