如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询? [英] How to use interactive query within kafka process topology in spring-cloud-stream?

查看:16
本文介绍了如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 批注的类或在带有 @StreamListener 的方法中使用交互式查询 (InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication 类和处理方法,但始终为空.

Is it possible to use interactive query (InteractiveQueryService) within Spring Cloud Stream the class with @EnableBinding annotation or within the method with @StreamListener? I tried instantiating ReadOnlyKeyValueStore within provided KStreamMusicSampleApplication class and process method but its always null.

我的@StreamListener 方法正在侦听一堆 KTable 和 KStream,并且在处理拓扑(例如过滤)期间,我必须检查来自 KStream 的密钥是否已存在于特定 KTable 中.

My @StreamListener method is listening to a bunch of KTables and KStreams and during the process topology e.g filtering, I have to check whether the key from a KStream already exists in a particular KTable.

我试图弄清楚如何扫描传入的 KTable 以检查密钥是否已存在但没有运气.然后我遇到了 InteractiveQueryService,它的 get() 方法可用于检查 KTable 中的 state store materializedAs 中是否存在键.问题是我无法从进程拓扑(@EnableBinding 或@StreamListener)访问它.它只能从这些注释之外访问,例如 RestController.

I tried to figure out how to scan an incoming KTable to check if a key already exists but no luck. Then I came across InteractiveQueryService whose get() method could be used to check if a key exists inside a state store materializedAs from a KTable. The problem is that I can't access it from with the process topology (@EnableBinding or @StreamListener). It can only be accessed from outside these annotation e.g RestController.

有没有办法扫描传入的 KTable 以检查键或值是否存在?如果没有,那么我们可以在流程拓扑中访问 InteractiveQueryService 吗?

Is there a way to scan an incoming KTable to check for the existence of a key or value? if not then can we access InteractiveQueryService within the process topology?

推荐答案

InteractiveQueryService 在 Spring Cloud Stream 中无法在 StreamListener 中的实际拓扑中使用.正如您所提到的,它应该在您的主要拓扑之外使用.但是,对于您描述的用例,您仍然可以使用主流程中的状态存储.例如,如果您有一个传入的 KStream 和一个被物化为状态存储的 KTable,那么您可以在 process 上调用 process>KStream 并以这种方式访问​​状态存储.这是实现这一目标的粗略代码.您需要将其转换为适合您的特定用例,但这是一个想法.

InteractiveQueryService in Spring Cloud Stream is not available to be used within the actual topology in your StreamListener. As you mentioned, it is supposed to be used outside of your main topology. However, with the use case you described, you still can use the state store from your main flow. For example, if you have an incoming KStream and a KTable which is materialized as a state store, then you can call process on the KStream and access the state store that way. Here is a rough code to achieve that. You need to convert this to fit into your specific use case, but here is the idea.

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

这篇关于如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆