如何在spring-cloud-stream中的kafka进程拓扑中使用交互式查询? [英] How to use interactive query within kafka process topology in spring-cloud-stream?

查看:113
本文介绍了如何在spring-cloud-stream中的kafka进程拓扑中使用交互式查询?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以在Spring Cloud Stream中使用具有@EnableBinding批注的类或在具有@StreamListener的方法中使用交互式查询(InteractiveQueryService)?我尝试实例化提供的

Is it possible to use interactive query (InteractiveQueryService) within Spring Cloud Stream the class with @EnableBinding annotation or within the method with @StreamListener? I tried instantiating ReadOnlyKeyValueStore within provided KStreamMusicSampleApplication class and process method but its always null.

我的@StreamListener方法正在监听一堆KTables和KStreams,并且在过程拓扑(例如过滤)期间,我必须检查KStream的键是否已存在于特定的KTable中.

My @StreamListener method is listening to a bunch of KTables and KStreams and during the process topology e.g filtering, I have to check whether the key from a KStream already exists in a particular KTable.

我试图弄清楚如何扫描传入的KTable来检查密钥是否已经存在但没有运气.然后,我遇到了InteractiveQueryService,它的get()方法可用于检查KTable的状态存储materializedAs中是否存在键.问题是我无法从进程拓扑(@EnableBinding或@StreamListener)访问它.只能从这些注释外部访问它,例如RestController.

I tried to figure out how to scan an incoming KTable to check if a key already exists but no luck. Then I came across InteractiveQueryService whose get() method could be used to check if a key exists inside a state store materializedAs from a KTable. The problem is that I can't access it from with the process topology (@EnableBinding or @StreamListener). It can only be accessed from outside these annotation e.g RestController.

有没有一种方法可以扫描传入的KTable来检查键或值的存在?如果不是,那么我们可以在流程拓扑中访问InteractiveQueryService吗?

Is there a way to scan an incoming KTable to check for the existence of a key or value? if not then can we access InteractiveQueryService within the process topology?

推荐答案

InteractiveQueryService.如前所述,应该在主拓扑之外使用它.但是,对于您描述的用例,您仍然可以从主流程中使用状态存储.例如,如果您有一个传入的KStream和一个实现为状态存储的KTable,则可以在KStream上调用process并以这种方式访问​​状态存储.这是实现此目的的粗略代码.您需要将其转换为适合您的特定用例的方法,但这是一个主意.

InteractiveQueryService in Spring Cloud Stream is not available to be used within the actual topology in your StreamListener. As you mentioned, it is supposed to be used outside of your main topology. However, with the use case you described, you still can use the state store from your main flow. For example, if you have an incoming KStream and a KTable which is materialized as a state store, then you can call process on the KStream and access the state store that way. Here is a rough code to achieve that. You need to convert this to fit into your specific use case, but here is the idea.

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

这篇关于如何在spring-cloud-stream中的kafka进程拓扑中使用交互式查询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆