在Kafka Streams上写入GlobalStateStore [英] Write to GlobalStateStore on Kafka Streams

查看:89
本文介绍了在Kafka Streams上写入GlobalStateStore的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在需要存储一些值的Kafka DSL上使用addGlobalStore,而我需要对所有线程/实例进行全局访问.

I am trying to use addGlobalStore on a Kafka DSL where a need to store few values that I will need global access for all my threads/instances.

我的问题是我需要定期更新拓扑中的这些值,并使所有正在运行的线程知道新值.

My problem is that I need periodically to update these values inside my topology and make all running threads aware of the new values.

我通过builder.addGlobalStore并使用了用作该函数最后一个参数的Processor的init()函数初始化了全局存储,但是我找不到更新全局存储中值的方法.

I initialized the global store through builder.addGlobalStore and using the init() function of a Processor that was used as the last argument on this function, but I cannot find a way to update the values inside the global store.

我的拓扑的下一步是一个Transformer,在这里我可以通过全局存储上的``init()"获取一个钩子,并读取存储的值,但是不幸的是,我无法全局更新它们.我的意思是我可以更新 正在运行的线程的本地副本,但其他线程/实例看不到更改.

The next step on my Topology is a Transformer where I can get a hook through ```init()`` on the global Store and read the stored values but unfortunately I cannot updated them globally. I mean I can update the local copy for the running thread but other threads/instances cannot see the change.

我在某处读到它无法在Transformer上完成,但是即使我使用处理器,问题仍然存在

I read somewhere that this cannot be done on Transformer, but even I use a Processor instead the issue remains

因此,有没有办法在Kafka DSL拓扑上更新globalStateStore, 如果可以,这怎么可能?还是为了使用全局存储,我需要使用低级处理器API吗?

So, Is there a way to update globalStateStore on a Kafka DSL topology, and if so how is this possible ? Or in order to use global store do I need to use the low level processor API ?

推荐答案

我通过builder.addGlobalStore并使用了用作该函数最后一个参数的Processor的init()函数初始化了全局存储,但是我找不到更新全局存储中值的方法.

I initialized the global store through builder.addGlobalStore and using the init() function of a Processor that was used as the last argument on this function, but I cannot find a way to update the values inside the global store.

您不能直接更新全局存储.相反,您必须更新(=向其写入消息)该全局存储区的基础主题.

You cannot update a global store directly. Instead, you must update (= write a message to) the underlying topic of that global store.

这篇关于在Kafka Streams上写入GlobalStateStore的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆