Kafka Streams当地州立商店 [英] Kafka Streams local state stores

查看:73
本文介绍了Kafka Streams当地州立商店的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的流应用程序,将一个主题作为输入流,并将KeyValues转换为另一个主题,例如:

I have a simple streams application takes one topic as input stream and transforms KeyValues to another like:

StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
        Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
                                    Serdes.Long(), CATEGORY_JSON_SERDE);
    streamsBuilder.addStateStore(builder)
                         .stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
                         .transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);

static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {

    static final String STORE_NAME = "test-store";

    private KeyValueStore<Long, CategoryDto> store;

    @Override
    public void init(ProcessorContext context) {
      store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
    }

    @Override
    public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
      store.put(key, value);
      return KeyValue.pair(key, value);
    }

    @Override
    public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {

    }
  }

在这里,我必须使用变压器,因为我需要获取存储并更新相关值.

Here i had to use transformer because i need to fetch store and update relevant value.

问题是使用本地状态存储与将值放入 ForeachAction 中的简单 HashMap 之间有什么区别?

The question is what is the difference between using local state stores, and just putting values to a simple HashMap inside a ForeachAction?

在这种情况下使用本地状态存储有什么优势?

What is the advantage of using local state stores in this case?

推荐答案

尽管未在代码中显示,但我假设您以某种方式读取并使用了存储状态.

Although it is not shown in your code, I'm assuming you somehow read and use the stored state.

使用简单的(在内存中) HashMap 存储状态使您的状态根本不持久,这意味着当以下任何一种情况发生时,您的状态将丢失(这些都不是寻常的),假设它会经常发生):

Storing your state using a simple (in memory) HashMap makes your state not persistent at all, this means your state will be lost when either of the following happens (those are nothing out of the ordinary, assume it will happen quite often):

  • 您的流处理器/应用程序停止,
  • 崩溃,或
  • 由于重新平衡,
  • 被部分迁移到其他地方(其他JVM).

非持久状态的问题是,当以上任何一种情况发生时,kafka-streams都会在最后一个提交的偏移量处重新开始处理.因此,在崩溃/停止/重新平衡之前处理的所有记录都不会被重新处理,这意味着 HashMap 的内容将在处理重新启动时为空.这当然不是您想要的.

The problem with a non-persistent state is that when any of the above happens, kafka-streams will restart the processing at the last committed offset. As such all records processed before the crash/stop/rebalance will not be reprocessed, this means the content of your HashMap will be empty when the processing restarts. This is certainly not what you want.

另一方面,如果您使用提供的状态存储之一,则kafka-streams将确保在上述任何中断之后处理重新启动后,该状态将变为可用,就好像该处理从未停止一样,而无需重新处理任何先前处理的记录.

On the other hand, if you use one of the provided state stores, kafka-streams will ensure that, once the processing restarts after any of the interruptions listed above, the state will be available as if the processing never stopped, without reprocessing any of the previously processed records.

这篇关于Kafka Streams当地州立商店的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆