如何将自定义StateStore添加到Kafka Streams DSL处理器? [英] How to add a custom StateStore to the Kafka Streams DSL processor?
问题描述
对于我的一个Kafka流应用程序,我需要同时使用DSL和Processor API的功能.我的流式应用程序流是
For one of my Kafka streams apps, I need to use the features of both DSL and Processor API. My streaming app flow is
source -> selectKey -> filter -> aggregate (on a window) -> sink
聚合之后,我需要向接收器发送一个SINGLE聚合消息.所以我将拓扑定义如下
After aggregation I need to send a SINGLE aggregated message to the sink. So I define my topology as below
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor());
我定义了一个自定义StateStore
并按如下所示在我的处理器中注册
I define a custom StateStore
and register it with my processor as below
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context = null;
Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);
KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build()
.get();
public MyProcessor() {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.register(invStore, false, null); // register the store
this.context.schedule(10 * 60 * 1000L);
}
@Override
public void process(String partitionKey, String message) {
try {
MessageModel smb = new MessageModel(message);
HashMapStore oldStore = invStore.get(partitionKey);
if (oldStore == null) {
oldStore = new HashMapStore();
}
oldStore.addSmb(smb);
invStore.put(partitionKey, oldStore);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void punctuate(long timestamp) {
// processes all the messages in the state store and sends single aggregate message
}
@Override
public void close() {
invStore.close();
}
}
运行应用程序时,我得到java.lang.NullPointerException
When I run the app, I get java.lang.NullPointerException
线程"StreamThread-18"中的异常java.lang.NullPointerException 在org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) 在org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) 在org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) 在org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) 在org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) 在org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) 在org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) 在org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Exception in thread "StreamThread-18" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
你知道这里出了什么问题吗?
Any idea what's going wrong here?
推荐答案
您需要使用StreamsBuilder
(或在较早版本中为KStreamBuilder
)注册处理器的外部存储.首先,创建商店,然后将其注册到StreamsBuilder
(KStreamBuilder
),然后在添加处理器时,提供商店名称以连接处理器和商店.
You need to register you store outside of you processor using StreamsBuilder
(or KStreamBuilder
in older releases). First you create the store, than you registers it to StreamsBuilder
(KStreamBuilder
), and when you add the processor you provide the store name to connect the processor and the store.
StreamsBuilder builder = new StreamsBuilder();
// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("invStore"),
Serdes.String(),
invSerde));
// register store
builder.addStateStore(storeBuilder);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
// older API:
KStreamBuilder builder = new KStreamBuilder();
// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
.withKeys(Serdes.String())
.withValues(invSerde)
.persistent()
.build();
// register store
builder.addStateStore(storeSupplier);
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
.filterNot((k,v) -> k.equals("UnknownGroup"))
.process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name
这篇关于如何将自定义StateStore添加到Kafka Streams DSL处理器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!