如何在两个 Kafka Streams 之间使用持久化的 StateStore [英] How to use a persisted StateStore between two Kafka Streams

查看:23
本文介绍了如何在两个 Kafka Streams 之间使用持久化的 StateStore的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦:

I'm having some troubles trying to achieve the following via Kafka Streams:

  • 在应用程序启动时,(压缩的)主题 alpha 被加载到键值 StateStore 映射中
  • Kafka Stream 从另一个主题中消费,使用 (.get) 上面的映射,最后在主题中生成一条新记录 alpha
  • 结果是内存映射应该与基础主题对齐,即使流媒体重新启动也是如此.
  • At the startup of the app, the (compacted) topic alpha gets loaded into a Key-Value StateStore map
  • A Kafka Stream consumes from another topic, uses (.get) the map above and finally produces a new record into topic alpha
  • The result is that the in-memory map should aligned with the underlying topic, even if the streamer gets restarted.

我的方法如下:

val builder = new StreamsBuilderS()

val store = Stores.keyValueStoreBuilder(
   Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)

builder.addStateStore(store)

val loaderStreamer = new LoaderStreamer(store).startStream()

[...] // I wait a few seconds until the loading is complete and the stream os running

val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!

builder
  .stream("another-topic")(Consumed.`with`(kSerde, vSerde))
  .doMyAggregationsAndgetFromTheMapAbove
  .transform(() => new StoreTransformer[K, V]("store"), "store")
  .to("alpha")(Produced.`with`(kSerde, vSerde))

LoaderStreamer(store):

[...]
val builders = new StreamsBuilderS()

builder.addStateStore(store)

builder
  .table("alpha")(Consumed.`with`(kSerde, vSerde))

builder.build
[...]

StoreTransformer:

[...]
override def init(context: ProcessorContext): Unit = {
  this.context = context
  this.store = 
    context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}

override def transform(key: K, value: V): (K, V) = {
  store.put(key, value)
  (key, value)
}
[...]

...但我得到的是:

Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.

在尝试获取商店处理程序时.

while trying to get the store handler.

知道如何实现这一目标吗?

Any idea on how to achieve this?

谢谢!

推荐答案

您不能在两个 Kafka Streams 应用程序之间共享状态存储.

You can't share state store between two Kafka Streams applications.

根据文档:https://docs.confluent.io/current/streams/faq.html#interactive-queries 上述异常可能有两个原因:

According to documentation: https://docs.confluent.io/current/streams/faq.html#interactive-queries there might be two reason of above exception:

  • 本地 KafkaStreams 实例尚未准备好,因此无法查询其本地状态存储.

  • The local KafkaStreams instance is not yet ready and thus its local state stores cannot be queried yet.

本地 KafkaStreams 实例已准备就绪,但特定状态存储只是在幕后迁移到另一个实例.

The local KafkaStreams instance is ready, but the particular state store was just migrated to another instance behind the scenes.

处理它的最简单方法是等到状态存储可查询:

The easiest way to deal with it is to wait till state store will be queryable:

public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }
}

整个示例可以在 融合的github.

这篇关于如何在两个 Kafka Streams 之间使用持久化的 StateStore的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆