如何在两个 Kafka Streams 之间使用持久化的 StateStore [英] How to use a persisted StateStore between two Kafka Streams
问题描述
我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦:
I'm having some troubles trying to achieve the following via Kafka Streams:
- 在应用程序启动时,(压缩的)主题
alpha
被加载到键值StateStore
映射中 - Kafka Stream 从另一个主题中消费,使用 (.get) 上面的映射,最后在主题中生成一条新记录
alpha
- 结果是内存映射应该与基础主题对齐,即使流媒体重新启动也是如此.
- At the startup of the app, the (compacted) topic
alpha
gets loaded into a Key-ValueStateStore
map - A Kafka Stream consumes from another topic, uses (.get) the map above and finally produces a new record into topic
alpha
- The result is that the in-memory map should aligned with the underlying topic, even if the streamer gets restarted.
我的方法如下:
val builder = new StreamsBuilderS()
val store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)
builder.addStateStore(store)
val loaderStreamer = new LoaderStreamer(store).startStream()
[...] // I wait a few seconds until the loading is complete and the stream os running
val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!
builder
.stream("another-topic")(Consumed.`with`(kSerde, vSerde))
.doMyAggregationsAndgetFromTheMapAbove
.transform(() => new StoreTransformer[K, V]("store"), "store")
.to("alpha")(Produced.`with`(kSerde, vSerde))
LoaderStreamer(store)
:
[...]
val builders = new StreamsBuilderS()
builder.addStateStore(store)
builder
.table("alpha")(Consumed.`with`(kSerde, vSerde))
builder.build
[...]
StoreTransformer
:
[...]
override def init(context: ProcessorContext): Unit = {
this.context = context
this.store =
context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}
override def transform(key: K, value: V): (K, V) = {
store.put(key, value)
(key, value)
}
[...]
...但我得到的是:
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.
在尝试获取商店处理程序时.
while trying to get the store handler.
知道如何实现这一目标吗?
Any idea on how to achieve this?
谢谢!
推荐答案
您不能在两个 Kafka Streams 应用程序之间共享状态存储.
You can't share state store between two Kafka Streams applications.
根据文档:https://docs.confluent.io/current/streams/faq.html#interactive-queries 上述异常可能有两个原因:
According to documentation: https://docs.confluent.io/current/streams/faq.html#interactive-queries there might be two reason of above exception:
本地 KafkaStreams 实例尚未准备好,因此无法查询其本地状态存储.
The local KafkaStreams instance is not yet ready and thus its local state stores cannot be queried yet.
本地 KafkaStreams 实例已准备就绪,但特定状态存储只是在幕后迁移到另一个实例.
The local KafkaStreams instance is ready, but the particular state store was just migrated to another instance behind the scenes.
处理它的最简单方法是等到状态存储可查询:
The easiest way to deal with it is to wait till state store will be queryable:
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
整个示例可以在 融合的github.
这篇关于如何在两个 Kafka Streams 之间使用持久化的 StateStore的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!