Kafka Streams:如何使用persistentKeyValueStore 从磁盘重新加载现有消息? [英] Kafka Streams: How to use persistentKeyValueStore to reload existing messages from disk?

查看:48
本文介绍了Kafka Streams:如何使用persistentKeyValueStore 从磁盘重新加载现有消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码当前使用的是 InMemoryKeyValueStore,它避免了对磁盘或 kafka 的任何持久性.我想使用rocksdb(Stores.persistentKeyValueStore),以便应用程序从磁盘重新加载状态.我正在尝试实现这一点,而且我对 Kafka 和流 API 非常陌生.非常感谢有关我如何进行更改的帮助,同时我仍然尝试在进行过程中理解内容.

My code is currently using an InMemoryKeyValueStore, which avoids any persistence to disk or to kafka. I want to use rocksdb (Stores.persistentKeyValueStore) so that the app will reload state from disk. I'm trying to implement this, and I'm very new to Kafka and the streams API. Would appreciate help on how I might make changes, while I still try to understand stuff as I go.

我试图在这里创建状态存储:

I tried to create the state store here:

StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> store =
                Stores.<String, LinkedList<StoreItem>>keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);

如何在流构建器中注册它?

How do I register it with the streams builder?

使用 inMemoryKeyValueStore 的现有代码:

Existing code which uses the inMemoryKeyValueStore:

   static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
            final String storeKey,
            final Serde<LinkedList<StoreItem>> valueSerde,
            final boolean loggingDisabled) {

        final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
                Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
        return storeBuilder;
    }

我需要确保流应用程序不会在每次重新启动时丢失日志主题中的现有消息.

I need to ensure that the streams app will not end up missing existing messages in the log topic each time it restarts.

推荐答案

您使用持久存储的方式与内存中存储完全相同.其余的由 store 负责,您无需担心加载数据等.您只需使用它即可.

You use a persistent store the exact some way as an in-memory store. The store takes care of the rest and you don't need to worry about loading data etc. You just use it.

这篇关于Kafka Streams:如何使用persistentKeyValueStore 从磁盘重新加载现有消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆