Kafka Streams:如何使用persistentKeyValueStore从磁盘重新加载现有消息? [英] Kafka Streams: How to use persistentKeyValueStore to reload existing messages from disk?

查看:208
本文介绍了Kafka Streams:如何使用persistentKeyValueStore从磁盘重新加载现有消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码当前正在使用InMemoryKeyValueStore,这样可以避免对磁盘或kafka的任何持久性. 我想使用rocksdb(Stores.persistentKeyValueStore),以便该应用程序将从磁盘重新加载状态.我正在尝试实现这一点,而对于Kafka和streams API来说,我还是一个新手.希望能对我做出的更改有所帮助,同时我仍然会尽力理解所学内容.

My code is currently using an InMemoryKeyValueStore, which avoids any persistence to disk or to kafka. I want to use rocksdb (Stores.persistentKeyValueStore) so that the app will reload state from disk. I'm trying to implement this, and I'm very new to Kafka and the streams API. Would appreciate help on how I might make changes, while I still try to understand stuff as I go.

我试图在这里创建状态存储:

I tried to create the state store here:

StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> store =
                Stores.<String, LinkedList<StoreItem>>keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);

如何在流构建器中注册它?

How do I register it with the streams builder?

使用inMemoryKeyValueStore的现有代码:

Existing code which uses the inMemoryKeyValueStore:

   static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
            final String storeKey,
            final Serde<LinkedList<StoreItem>> valueSerde,
            final boolean loggingDisabled) {

        final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
                Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
        return storeBuilder;
    }

我需要确保每次重启后,流应用程序都不会最终丢失日志主题中的现有消息.

I need to ensure that the streams app will not end up missing existing messages in the log topic each time it restarts.

推荐答案

您将永久存储完全用作内存存储.商店负责其余的工作,您无需担心加载数据等问题.您只需使用它即可.

You use a persistent store the exact some way as an in-memory store. The store takes care of the rest and you don't need to worry about loading data etc. You just use it.

这篇关于Kafka Streams:如何使用persistentKeyValueStore从磁盘重新加载现有消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆