在 Kafka Streams 中反序列化 POJO [英] Deserialise a POJO in Kafka Streams

查看:41
本文介绍了在 Kafka Streams 中反序列化 POJO的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 Kafka 主题有这种格式的消息

My Kafka topic has messages of this format

user1,subject1,80|user1,subject2,90 

user2,subject1,70|user2,subject2,100 

and so on. 

我已经创建了用户 POJO,如下所示.

I have created User POJO as below.

class User implements Serializable{
/**
 * 
 */
private static final long serialVersionUID = -253687203767610477L;
private String userId;
private String subject;
private String marks;

public User(String userId, String subject, String marks) {
    super();
    this.userId = userId;
    this.subject = subject;
    this.marks = marks;
}

public String getUserId() {
    return userId;
}

public void setUserId(String userId) {
    this.userId = userId;
}
public String getSubject() {
    return subject;
}
public void setSubject(String subject) {
    this.subject = subject;
}
public String getMarks() {
    return marks;
}
public void setMarks(String marks) {
    this.marks = marks;
}
}

我还创建了默认的键值序列化

Further I have created default key value serialization

streamProperties.put(
            StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamProperties.put(
            StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

我正在尝试按用户 ID 查找计数,如下所示.我还需要 User 对象来执行其他一些功能.

I am trying to find count by userID as follows. Also I need User object to perform some other functionalities.

KTable<String, Long> wordCount = streamInput

    .flatMap(new KeyValueMapper<String, String, Iterable<KeyValue<String,User>>>() {

        @Override
        public Iterable<KeyValue<String, User>> apply(String key, String value) {
            String[] userObjects = value.split("|");
            List<KeyValue<String, User>> userList = new LinkedList<>();
            for(String userObject: userObjects) {
                String[] userData = userObject.split(",");
                userList.add(KeyValue.pair(userData[0],
                        new User(userData[0],userData[1],userData[2])));


            }
            return userList;
        }
    })

.groupByKey()
.count();

我收到以下错误

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.example.testing.dao.User). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我想我需要为用户类提供正确的Serde.

I think I need to provide correct Serde for User Class.

推荐答案

问题在于 Value Serdes.

The problem is with Value Serdes.

函数groupBy有两个版本:

There are two version of function groupBy:

  • KStream::KGroupedStreamgroupByKey();
  • KStream::KGroupedStreamgroupByKey(final Grouped grouped);

引擎盖下的第一个版本调用第二个 Grouped 和默认 Serdes(在你的情况下,它用于键和值 StringSerde

First version under the hood call second with Grouped with default Serdes (In your case it was for key and value StringSerde

您的 flatMap 将消息映射到 KeyValue 类型,因此值的类型为 User.

Your flatMap map message to KeyValue<String, User> type so value was of type User.

您的情况的解决方案是使用 groupByKey() call groupByKey(Grouped.with(keySerde, valSerde));,使用适当的 Serdes.

Solution in your case would be instead using groupByKey() call groupByKey(Grouped.with(keySerde, valSerde));, with proper Serdes.

这篇关于在 Kafka Streams 中反序列化 POJO的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆