Kafka Streams 本地状态存储 [英] Kafka Streams local state stores

查看:55
本文介绍了Kafka Streams 本地状态存储的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的流应用程序,将一个主题作为输入流并将 KeyValues 转换为另一个主题,例如:

I have a simple streams application takes one topic as input stream and transforms KeyValues to another like:

StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
        Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
                                    Serdes.Long(), CATEGORY_JSON_SERDE);
    streamsBuilder.addStateStore(builder)
                         .stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
                         .transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);

static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {

    static final String STORE_NAME = "test-store";

    private KeyValueStore<Long, CategoryDto> store;

    @Override
    public void init(ProcessorContext context) {
      store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
    }

    @Override
    public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
      store.put(key, value);
      return KeyValue.pair(key, value);
    }

    @Override
    public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {

    }
  }

这里我不得不使用转换器,因为我需要获取存储和更新相关值.

Here i had to use transformer because i need to fetch store and update relevant value.

问题是使用本地状态存储和仅将值放入 ForeachAction 中的简单 HashMap 之间有什么区别?

The question is what is the difference between using local state stores, and just putting values to a simple HashMap inside a ForeachAction?

在这种情况下使用本地状态存储有什么好处?

What is the advantage of using local state stores in this case?

推荐答案

虽然它没有显示在您的代码中,但我假设您以某种方式读取并使用了存储的状态.

Although it is not shown in your code, I'm assuming you somehow read and use the stored state.

使用简单的(在内存中)HashMap 存储您的状态,使您的状态根本不持久,这意味着当发生以下任一情况时,您的状态将丢失(这些都不是异常的),假设它会经常发生):

Storing your state using a simple (in memory) HashMap makes your state not persistent at all, this means your state will be lost when either of the following happens (those are nothing out of the ordinary, assume it will happen quite often):

  • 您的流处理器/应用程序停止,
  • 崩溃,或
  • 由于重新平衡,部分迁移到其他地方(其他 JVM).

非持久状态的问题在于,当上述任何一种情况发生时,kafka-streams 将在最后提交的偏移量处重新开始处理.因此所有在崩溃/停止/重新平衡之前处理的记录都不会被重新处理,这意味着当处理重新开始时,您的 HashMap 的内容将为空.这当然不是您想要的.

The problem with a non-persistent state is that when any of the above happens, kafka-streams will restart the processing at the last committed offset. As such all records processed before the crash/stop/rebalance will not be reprocessed, this means the content of your HashMap will be empty when the processing restarts. This is certainly not what you want.

另一方面,如果您使用提供的状态存储之一,kafka-streams 将确保,一旦处理在上面列出的任何中断后重新启动,状态将可用,就好像处理从未停止过一样,没有重新处理之前处理过的任何记录.

On the other hand, if you use one of the provided state stores, kafka-streams will ensure that, once the processing restarts after any of the interruptions listed above, the state will be available as if the processing never stopped, without reprocessing any of the previously processed records.

这篇关于Kafka Streams 本地状态存储的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆