Spring Cloud @StreamListener 条件已弃用什么是替代方案 [英] Spring Cloud @StreamListener condition deprecated what is the alternative

本文介绍了Spring Cloud @StreamListener 条件已弃用什么是替代方案的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有多个应用程序消费者监听同一个 kafka 主题,生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息.例如

We have multiple applications consumer listening to the same kafka topic and a producer sets the message header when sending message to the topic so specific instance can evaluate the header and process the message. eg

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}

在 Spring Cloud Stream 3.0.0 中,@StreamListener 已弃用,我在 Function 中找不到与 condition 属性等效的属性.

In Spring Cloud Stream 3.0.0 the @StreamListener is deprecated and I could not find the equivalent of the condition property in Function.

有什么建议吗?

推荐答案

虽然我也找不到函数式方法的等效方案,但我有一个建议.

Though I was not able to find the equivalent for the functional approach either, I do have a suggestion.

@StreamListener 注释条件并不能阻止应用程序必须使用消息、读取其标头并在将其传递给侦听器之前过滤掉特定记录的事实(fullfillOrder()).因此,可以安全地假设您正在消费每条命中主题的消息(通过 Spring Cloud 在幕后为我们实现的事件接收器),但侦听器仅在 header == sydney 时执行.

The @StreamListener annotations condition does not stop the fact that the application must consume the message, read its header, and filter out specific records before passing it to the listener (fullfillOrder()). So it's safe to assume you're consuming every message that hits the topic regardless (by the event receiver that Spring Cloud has implemented for us under the hood), but the listener only gets executed when header == sydney.

如果有一种方法可以配置 Spring Cloud 使用的事件接收器(在点击侦听器之前丢弃消息),我建议研究一下.如果没有,将在进行任何处理之前过滤掉任何消息(非悉尼).如果您熟悉 Spring Cloud 的函数式方法,看起来像这样:

If there was a way to configure the event receiver that Spring Cloud uses (to discard message before hitting listener), I would suggest looking into that. If not, would resort to filtering out any messages (non-sydney) before doing any processing. If you're familiar with Spring Cloud's functional approach, would look something like this:

@Bean
public Consumer<Message<TestObj>> fulfillOrder() {
    return msg -> {
        // to get header - msg.getHeaders().get(key, valueType);
        // filter out bad messages
    }
}

@Bean
public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
    return msg -> {
        // msg.headers().lastHeader("franchiseName").value() -> filter em out
    }
}

其他:^ 我的代码假设您通过 spring-cloud-stream-binder-kafka 将 kafka-client API 与 Spring Cloud 流集成.根据列出的标签,我会注意到 Spring Cloud Stream 有两个版本的 Kafka 绑定器 - 一个用于 kafka 客户端库,一个用于 kafka 流库.

Other: ^ my code assumes you're integrating the kafka-client API with Spring cloud stream via spring-cloud-stream-binder-kafka. based on tags listed, i will note Spring Cloud Stream has two versions of binders for Kafka - one for the kafka client library, and one for kafka streams library.

在不考虑 Spring Cloud/Frameworks 的情况下,kafka 流中的高级别 DSL 不会让您访问标头,但低级别处理器 API 可以.从示例中,您似乎在利用客户端绑定器而不是 spring-cloud-stream-binder-kafka-streams/kafka 流绑定器.我还没有看到使用低级处理器 API 的 Spring Cloud 流 + kafka 流绑定器的实现,所以我不知道这是否是目标.

Without considering Spring Cloud / Frameworks, the high-lvl DSL in kafka streams doesn't give you access to headers, but the low-level Processor API does. From the example, it seems like you're leveraging the client binder and not spring-cloud-stream-binder-kafka-streams / kafka streams binder. I haven't seen an implementation of spring cloud stream + kafka streams binder using the low-level processor API, so i can't tell if that was the aim.

这篇关于Spring Cloud @StreamListener 条件已弃用什么是替代方案的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆