在Kafka Streams中反序列化POJO [英] Deserialise a POJO in Kafka Streams

查看:232
本文介绍了在Kafka Streams中反序列化POJO的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Kafka主题具有这种格式的消息

My Kafka topic has messages of this format

user1,subject1,80|user1,subject2,90 

user2,subject1,70|user2,subject2,100 

and so on. 

我已经如下创建用户POJO.

I have created User POJO as below.

class User implements Serializable{
/**
 * 
 */
private static final long serialVersionUID = -253687203767610477L;
private String userId;
private String subject;
private String marks;

public User(String userId, String subject, String marks) {
    super();
    this.userId = userId;
    this.subject = subject;
    this.marks = marks;
}

public String getUserId() {
    return userId;
}

public void setUserId(String userId) {
    this.userId = userId;
}
public String getSubject() {
    return subject;
}
public void setSubject(String subject) {
    this.subject = subject;
}
public String getMarks() {
    return marks;
}
public void setMarks(String marks) {
    this.marks = marks;
}
}

我还创建了默认键值序列化

Further I have created default key value serialization

streamProperties.put(
            StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamProperties.put(
            StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

我正在尝试按以下方式通过userID查找计数.另外,我还需要User对象来执行其他一些功能.

I am trying to find count by userID as follows. Also I need User object to perform some other functionalities.

KTable<String, Long> wordCount = streamInput

    .flatMap(new KeyValueMapper<String, String, Iterable<KeyValue<String,User>>>() {

        @Override
        public Iterable<KeyValue<String, User>> apply(String key, String value) {
            String[] userObjects = value.split("|");
            List<KeyValue<String, User>> userList = new LinkedList<>();
            for(String userObject: userObjects) {
                String[] userData = userObject.split(",");
                userList.add(KeyValue.pair(userData[0],
                        new User(userData[0],userData[1],userData[2])));


            }
            return userList;
        }
    })

.groupByKey()
.count();

我收到以下错误

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.example.testing.dao.User). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我认为我需要为用户类别提供正确的Serde.

I think I need to provide correct Serde for User Class.

推荐答案

问题在于Value Serdes.

The problem is with Value Serdes.

功能groupBy有两种版本:

There are two version of function groupBy:

  • KStream::KGroupedStream<K, V> groupByKey();
  • KStream::KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);
  • KStream::KGroupedStream<K, V> groupByKey();
  • KStream::KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);

幕后的第一个版本使用默认的Serdes调用第二个Grouped(在您的情况下,它用于键和值StringSerde

First version under the hood call second with Grouped with default Serdes (In your case it was for key and value StringSerde

您的flatMap映射消息为KeyValue<String, User>类型,因此值的类型为User.

Your flatMap map message to KeyValue<String, User> type so value was of type User.

您的解决方案将改为使用带有适当Serdes的groupByKey()调用groupByKey(Grouped.with(keySerde, valSerde));.

Solution in your case would be instead using groupByKey() call groupByKey(Grouped.with(keySerde, valSerde));, with proper Serdes.

这篇关于在Kafka Streams中反序列化POJO的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆