如何在Kafka流中使用HashMap作为值创建状态存储? [英] How to create a state store with HashMap as value in Kafka streams?

查看:340
本文介绍了如何在Kafka流中使用HashMap作为值创建状态存储?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用字符串键HashMap作为值创建一个状态存储.我尝试了以下两种方法.

I need to create a state store with String key HashMap as value. I tried the below two methods.

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
          .withKeys(Serdes.String())
          .withValues(HashMap.class)
          .persistent()
          .build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
          .withKeys(Serdes.String())
          .withValues(Serdes.serdeFrom(h.getClass()))
          .persistent()
          .build();

代码可以正常编译,没有任何错误,但是出现运行时错误

The code compiles fine without any error, but I get a runtime error

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

有人可以建议我创建状态存储的正确方法是什么吗?

Can someone suggest me what is the correct way to create a state store?

推荐答案

如果要创建状态存储,则需要提供 serializer deserializer 类您要使用的类型.在Kafka Stream中,只有一个称为 Serde 的抽象,它将序列化程序和反序列化程序包装在一个类中.

If you want to create a state store, you need to provide a serializer and deserializer class for the type you want to use. In Kafka Stream, there is a single abstraction called Serde that wraps serializer and deserializer in a single class.

如果您使用.withValues(Class<K> keyClass),则必须保持该

If you use .withValues(Class<K> keyClass) it must hold that

@param keyClass密钥的类,该类必须是Kafka内置Serdes的类型之一

@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes

因为对于HashMap没有内置的Serdes,您需要首先实现一个(可能称为HashMapSerde)并将此类提供给方法.withValues(Serde<K> keySerde).此外,您还必须为HashMap实现实际的序列化器和反序列化器.如果您知道HashMap的通用类型,则应指定它们(这使得序列化器和反序列化器的实现更加简单.

Because there is no built-in Serdes for HashMap you need to implement one first (maybe called HashMapSerde) and give this class to the method .withValues(Serde<K> keySerde). Furhtermore, you must implement the actual serializer and deserializer for HashMap, too. If you know the generic types of your HashMap, you should specify them (what make the implementation of serializer and deserializer much simpler.

类似这样的东西(只是一个草图;省略了通用类型):

Something like this (just a sketch; generic types omitted):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

    void configure(Map<String, ?> configs, boolean isKey) {
        /* put your code here */
    }

    void close() {
        /* put your code here */
    }

    Serializer<HashMap> serializer() {
        return new Serializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public byte[] serialize(String topic, T data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }

    Deserializer<HashMap> deserializer() {
        return new Deserializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public T deserialize(String topic, byte[] data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }
}

如果要查看有关实现(反)序列化器和Serde的示例,请查看 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

If you want to see examples for how to implement (de)serializers and Serde, have a look into https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

这篇关于如何在Kafka流中使用HashMap作为值创建状态存储?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆