在 Kafka Streams 上写入 GlobalStateStore [英] Write to GlobalStateStore on Kafka Streams
问题描述
我正在尝试在 Kafka DSL 上使用 addGlobalStore,其中需要存储一些我需要对所有线程/实例进行全局访问的值.
I am trying to use addGlobalStore on a Kafka DSL where a need to store few values that I will need global access for all my threads/instances.
我的问题是我需要定期更新拓扑中的这些值并使所有正在运行的线程都知道新值.
My problem is that I need periodically to update these values inside my topology and make all running threads aware of the new values.
我通过 builder.addGlobalStore
并使用处理器的 init()
函数初始化了全局存储,该函数用作此函数的最后一个参数,但我不能找到一种方法来更新全局存储中的值.
I initialized the global store through builder.addGlobalStore
and using the init()
function of a Processor that was used as the last argument on this function, but I cannot find a way to update the values inside the global store.
我的拓扑的下一步是一个 Transformer,我可以通过全局 Store 上的 ```init()`` 获得一个钩子并读取存储的值,但不幸的是我无法全局更新它们.我的意思是我可以更新正在运行的线程的本地副本,但其他线程/实例看不到更改.
The next step on my Topology is a Transformer where I can get a hook through ```init()`` on the global Store and read the stored values but unfortunately I cannot updated them globally. I mean I can update the local copy for the running thread but other threads/instances cannot see the change.
我在某处读到这不能在 Transformer 上完成,但即使我使用处理器,问题仍然存在
I read somewhere that this cannot be done on Transformer, but even I use a Processor instead the issue remains
那么,有没有办法在 Kafka DSL 拓扑上更新 globalStateStore,如果是这样,这怎么可能?或者为了使用全局存储,我需要使用低级处理器 API 吗?
So, Is there a way to update globalStateStore on a Kafka DSL topology, and if so how is this possible ? Or in order to use global store do I need to use the low level processor API ?
推荐答案
我通过 builder.addGlobalStore 并使用处理器的 init() 函数初始化了全局存储,该函数用作此函数的最后一个参数,但我找不到更新全局存储中值的方法.>
您不能直接更新全局商店.相反,您必须更新(= 向其写入消息)该全局存储的基础主题.
You cannot update a global store directly. Instead, you must update (= write a message to) the underlying topic of that global store.
这篇关于在 Kafka Streams 上写入 GlobalStateStore的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!