如何将自定义StateStore添加到Kafka Streams DSL处理器? [英] How to add a custom StateStore to the Kafka Streams DSL processor?

查看:172
本文介绍了如何将自定义StateStore添加到Kafka Streams DSL处理器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于我的一个Kafka流应用程序,我需要同时使用DSL和Processor API的功能.我的流式应用程序流是

For one of my Kafka streams apps, I need to use the features of both DSL and Processor API. My streaming app flow is

source -> selectKey -> filter -> aggregate (on a window) -> sink

聚合之后,我需要向接收器发送一个SINGLE聚合消息.所以我将拓扑定义如下

After aggregation I need to send a SINGLE aggregated message to the sink. So I define my topology as below

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());

我定义了一个自定义StateStore并按如下所示在我的处理器中注册

I define a custom StateStore and register it with my processor as below

public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, null); // register the store
        this.context.schedule(10 * 60 * 1000L);
    }

    @Override
    public void process(String partitionKey, String message) {
        try {
            MessageModel smb = new MessageModel(message);
            HashMapStore oldStore = invStore.get(partitionKey);
            if (oldStore == null) {
                oldStore = new HashMapStore();
            }
            oldStore.addSmb(smb);
            invStore.put(partitionKey, oldStore);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    @Override
    public void punctuate(long timestamp) {
       // processes all the messages in the state store and sends single aggregate message
    }


    @Override
    public void close() {
        invStore.close();
    }
}

运行应用程序时,我得到java.lang.NullPointerException

When I run the app, I get java.lang.NullPointerException

线程"StreamThread-18"中的异常java.lang.NullPointerException 在org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) 在org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) 在org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) 在org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) 在org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) 在org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) 在org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) 在org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

Exception in thread "StreamThread-18" java.lang.NullPointerException at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:332) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

你知道这里出了什么问题吗?

Any idea what's going wrong here?

推荐答案

您需要使用StreamsBuilder(或在较早版本中为KStreamBuilder)注册处理器的外部存储.首先,创建商店,然后将其注册到StreamsBuilder(KStreamBuilder),然后在添加处理器时,提供商店名称以连接处理器和商店.

You need to register you store outside of you processor using StreamsBuilder (or KStreamBuilder in older releases). First you create the store, than you registers it to StreamsBuilder (KStreamBuilder), and when you add the processor you provide the store name to connect the processor and the store.

StreamsBuilder builder = new StreamsBuilder();

// create store
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("invStore"),
    Serdes.String(),
    invSerde));
// register store
builder.addStateStore(storeBuilder);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name


// older API:

KStreamBuilder builder = new KStreamBuilder();

// create store
StateStoreSupplier storeSupplier = (KeyValueStore)Stores.create("invStore")
    .withKeys(Serdes.String())
    .withValues(invSerde)
    .persistent()
    .build();
// register store
builder.addStateStore(storeSupplier);

KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
        .filterNot((k,v) -> k.equals("UnknownGroup"))
        .process(() -> new MyProcessor(), "invStore"); // connect store to processor by providing store name

这篇关于如何将自定义StateStore添加到Kafka Streams DSL处理器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆