如何在 Kafka 流中使用 HashMap 作为值创建状态存储? [英] How to create a state store with HashMap as value in Kafka streams?
问题描述
我需要创建一个以字符串键 HashMap 作为值的状态存储.我尝试了以下两种方法.
I need to create a state store with String key HashMap as value. I tried the below two methods.
// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();
// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();
StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();
代码编译正常,没有任何错误,但出现运行时错误
The code compiles fine without any error, but I get a runtime error
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer
有人可以建议我创建状态存储的正确方法是什么吗?
Can someone suggest me what is the correct way to create a state store?
推荐答案
如果要创建状态存储,则需要提供 serializer 和 deserializer 类用于您要使用的类型.在 Kafka Stream 中,有一个名为 Serde 的抽象,它将序列化器和反序列化器包装在一个类中.
If you want to create a state store, you need to provide a serializer and deserializer class for the type you want to use. In Kafka Stream, there is a single abstraction called Serde that wraps serializer and deserializer in a single class.
如果你使用 .withValues(Class
它必须保持那个
If you use .withValues(Class<K> keyClass)
it must hold that
@param keyClass 键的类,必须是Kafka内置serdes的类型之一
@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
因为 HashMap
没有内置的 Serdes
,你需要先实现一个(可能称为 HashMapSerde
)并将这个类提供给方法 .withValues(Serde
.此外,您还必须为 HashMap
实现实际的序列化器和反序列化器.如果你知道你的 HashMap 的泛型类型,你应该指定它们(这使得序列化器和反序列化器的实现更简单.
Because there is no built-in Serdes
for HashMap
you need to implement one first (maybe called HashMapSerde
) and give this class to the method .withValues(Serde<K> keySerde)
. Furhtermore, you must implement the actual serializer and deserializer for HashMap
, too. If you know the generic types of your HashMap, you should specify them (what make the implementation of serializer and deserializer much simpler.
类似这样的东西(只是一个草图;省略了泛型类型):
Something like this (just a sketch; generic types omitted):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
如果您想查看有关如何实现(反)序列化程序和 Serde
的示例,请查看 https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization 和 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
If you want to see examples for how to implement (de)serializers and Serde
, have a look into https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
这篇关于如何在 Kafka 流中使用 HashMap 作为值创建状态存储?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!