如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器? [英] How to add a custom StateStore to the Kafka Streams DSL processor?

查看:27
本文介绍了如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于我的 Kafka 流应用程序之一,我需要同时使用 DSL 和处理器 API 的功能.我的流媒体应用流程是

For one of my Kafka streams apps, I need to use the features of both DSL and Processor API. My streaming app flow is

source -> selectKey -> filter -> aggregate (on a window) -> sink

聚合后,我需要向接收器发送一条聚合消息.所以我定义我的拓扑如下

After aggregation I need to send a SINGLE aggregated message to the sink. So I define my topology as below

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());

我定义了一个自定义的 StateStore 并将其注册到我的处理器中,如下所示

I define a custom StateStore and register it with my processor as below

public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, null); // register the store
        this.context.schedule(10 * 60 * 1000L);
    }

    @Override
    public void process(String partitionKey, String message) {
        try {
            MessageModel smb = new MessageModel(message);
            HashMapStore oldStore = invStore.get(partitionKey);
            if (oldStore == null) {
                oldStore = new HashMapStore();
            }
            oldStore.addSmb(smb);
            invStore.put(partitionKey, oldStore);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    @Override
    public void punctuate(long timestamp) {
       // processes all the messages in the state store and sends single aggregate message
    }


    @Override
    public void close() {
        invStore.close();
    }
}

当我运行应用程序时,我得到 java.lang.NullPointerException

When I run the app, I get java.lang.NullPointerException

线程StreamThread-18"中的异常 java.lang.NullPointerException在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167)在 org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332)在 org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252)在 org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)在 org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)在 org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Exception in thread "StreamThread-18" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

知道这里出了什么问题吗?

Any idea what's going wrong here?

推荐答案

您需要使用 StreamsBuilder(或 KStreamBuilder 在旧版本中).首先创建商店,然后将其注册到 StreamsBuilder (KStreamBuilder),然后在添加处理器时提供商店名称以连接处理器和商店.

You need to register you store outside of you processor using StreamsBuilder (or KStreamBuilder in older releases). First you create the store, than you registers it to StreamsBuilder (KStreamBuilder), and when you add the processor you provide the store name to connect the processor and the store.

StreamsBuilder builder = new StreamsBuilder();

// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("invStore"),
    Serdes.String(),
    invSerde));
// register store
builder.addStateStore(storeBuilder);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name


// older API:

KStreamBuilder builder = new KStreamBuilder();

// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
    .withKeys(Serdes.String())
    .withValues(invSerde)
    .persistent()
    .build();
// register store
builder.addStateStore(storeSupplier);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name

这篇关于如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆