使用Spring EL将可选的后缀从属性添加到@KafkaListener中的Consumer Group [英] Using Spring EL to add optional postfix from properties to consumerGroup in @KafkaListener

查看:0
本文介绍了使用Spring EL将可选的后缀从属性添加到@KafkaListener中的Consumer Group的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的适用于Kafka消费者的Spring Boot应用程序,如下所示

@KafkaListener(topics="topic", groupId="SOME_CONSTANT") {
....
}
我需要做的是添加可选的Spring Boot属性(从环境变量,但这并不重要),假设: myapp.env: TEST

当该变量存在时,我应该自动将消费者组更新为 SOME_CONSTANT-TEST

我在玩Spel

@KafkaListener(topics="topic", groupId="#{ '${myApp.env}' == null ? 'SOME_CONSTANT' : 'SOME_CONSTANT' + '-' + '${myApp.env}}'") {
....
}

但这似乎不起作用:/有什么想法吗?

推荐答案

您可以使用T运算符读取常量的值,在没有环境变量的情况下使用冒号‘:’:

@KafkaListener(topics="topic", groupId="#{ '${my.app.env:}' == '' ? T(com.mypackage.MyListener).SOME_CONSTANT : T(com.mypackage.MyListener).SOME_CONSTANT + '-' + '${my.app.env:}'}")

以下是使用此解决方案的示例应用程序:

package org.spring.kafka.playground;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@SpringBootApplication
public class SO71291726 {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SO71291726.class, args);
        try {
            Thread.sleep(10000);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new RuntimeException("Interrupted");
        }
        KafkaOperations kafkaTemplate = context.getBean("kafkaTemplate", KafkaOperations.class);
        kafkaTemplate.send("topic", "My message");
    }

    Logger log = LoggerFactory.getLogger(this.getClass());

    public static final String SOME_CONSTANT = "my-group-id-constant";

    @Component
    class MyListener {

        @KafkaListener(topics="topic", groupId="#{ '${71291726.my.app.env:}' == '' ? T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT : T(org.spring.kafka.playground.SO71291726).SOME_CONSTANT + '-' + '${71291726.my.app.env:}'}")
        void listen(String message, @Header(KafkaHeaders.GROUP_ID) String groupId) {
            log.info("Received message {} from group id {} ", message, groupId);
        }
    }
}

输出: 2022-02-28 14:26:14.733 INFO 18841 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$cf264156 : Received message My message from group id my-group-id-constant

如果我将71291726.my.app.env = TEST添加到application.properties文件:

2022-02-28 14:34:03.900 INFO 18870 --- [ntainer#0-0-C-1] 1291726$$EnhancerBySpringCGLIB$$e1a5933e : Received message My message from group id my-group-id-constant-TEST

这篇关于使用Spring EL将可选的后缀从属性添加到@KafkaListener中的Consumer Group的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆