如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询? [英] How to use interactive query within kafka process topology in spring-cloud-stream?
问题描述
是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空.
Is it possible to use interactive query (InteractiveQueryService) within Spring Cloud Stream the class with @EnableBinding annotation or within the method with @StreamListener? I tried instantiating ReadOnlyKeyValueStore within provided KStreamMusicSampleApplication class and process method but its always null.
我的@StreamListener 方法正在侦听一堆 KTable 和 KStream,并且在处理拓扑(例如过滤)期间,我必须检查来自 KStream 的密钥是否已存在于特定 KTable 中.
My @StreamListener method is listening to a bunch of KTables and KStreams and during the process topology e.g filtering, I have to check whether the key from a KStream already exists in a particular KTable.
我试图弄清楚如何扫描传入的 KTable 以检查密钥是否已存在但没有运气.然后我遇到了 InteractiveQueryService,它的 get() 方法可用于检查 KTable 中的 state store materializedAs 中是否存在键.问题是我无法从进程拓扑(@EnableBinding 或@StreamListener)访问它.它只能从这些注释之外访问,例如 RestController.
I tried to figure out how to scan an incoming KTable to check if a key already exists but no luck. Then I came across InteractiveQueryService whose get() method could be used to check if a key exists inside a state store materializedAs from a KTable. The problem is that I can't access it from with the process topology (@EnableBinding or @StreamListener). It can only be accessed from outside these annotation e.g RestController.
有没有办法扫描传入的 KTable 以检查键或值是否存在?如果没有,那么我们可以在流程拓扑中访问 InteractiveQueryService 吗?
Is there a way to scan an incoming KTable to check for the existence of a key or value? if not then can we access InteractiveQueryService within the process topology?
推荐答案
InteractiveQueryService
在 Spring Cloud Stream 中无法在 StreamListener
中的实际拓扑中使用.正如您所提到的,它应该在您的主要拓扑之外使用.但是,对于您描述的用例,您仍然可以使用主流程中的状态存储.例如,如果您有一个传入的 KStream
和一个被物化为状态存储的 KTable
,那么您可以在 process
上调用 process
>KStream 并以这种方式访问状态存储.这是实现这一目标的粗略代码.您需要将其转换为适合您的特定用例,但这是一个想法.
InteractiveQueryService
in Spring Cloud Stream is not available to be used within the actual topology in your StreamListener
. As you mentioned, it is supposed to be used outside of your main topology. However, with the use case you described, you still can use the state store from your main flow. For example, if you have an incoming KStream
and a KTable
which is materialized as a state store, then you can call process
on the KStream
and access the state store that way. Here is a rough code to achieve that. You need to convert this to fit into your specific use case, but here is the idea.
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");
这篇关于如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!