Flink Kafka-自定义类数据始终为null [英] Flink Kafka - Custom Class Data is always null

查看:658
本文介绍了Flink Kafka-自定义类数据始终为null的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

自定义类

人员

class Person
{
  private Integer id;
  private String name; 
 //getters and setters
}

Kafka Flink连接器

TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));

现在,如果我发送以下json

Now if I send the below json

{ "id" : 1, "name": Synd }

通过Kafka Console Producer,flink代码引发空指针异常 但是,如果我使用SimpleStringSchema而不是之前定义的CustomSchema,则该流将被打印.

through Kafka Console Producer, the flink code throws null pointer exception But if I use SimpleStringSchema instead of CustomSchema as defined before, the stream is getting printed.

上述设置有什么问题

推荐答案

回答相同问题的人

自定义序列化器

class PersonSchema implements DeserializationSchema<Person>{

    private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;

    @Override
    public Person deserialize(byte[] bytes) throws IOException {
        return mapper.readValue( bytes, Person.class );
    }

    @Override
    public boolean isEndOfStream(Person person) {
        return false;
    }

    @Override
    public TypeInformation<Person> getProducedType() {
        return TypeInformation.of(new TypeHint<Person>(){});
    }
}

使用架构

DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));

这篇关于Flink Kafka-自定义类数据始终为null的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆