我可以在运行时向我的@kafkalistener添加主题吗 [英] Can i add topics to my @kafkalistener at runtime

查看:16
本文介绍了我可以在运行时向我的@kafkalistener添加主题吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经为主题数组创建了一个Bean,并在运行时向该主题数组添加了一些主题,但使用者没有更新主题,仍然从主题数组中的第一个主题开始消费。我希望消费者添加这些新主题并开始使用它们

@Autowired
private String[] topicArray;

@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
    public void listen(...) {
        ...
    }

推荐答案

否;该属性在初始化期间计算一次。

不能在运行时将主题添加到现有侦听器容器。

但是,您可以将侦听器Bean设置为原型Bean,并在每次要侦听新主题时创建一个新容器。

举个例子:

@SpringBootApplication
public class So68744775Application {

    public static void main(String[] args) {
        SpringApplication.run(So68744775Application.class, args);
    }

    private String[] topics;

    private final AtomicInteger count = new AtomicInteger();

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    Foo foo() {
        return new Foo();
    }

    @Bean
    Supplier<String> idProvider() {
        return () -> "so68744775-" + count.getAndIncrement();
    }

    @Bean
    Supplier<String[]> topicProvider() {
        return () -> this.topics;
    }

    @Bean
    ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            this.topics = new String[] { "topic1", "topic2" };
            context.getBean(Foo.class);
            this.topics = new String[] { "topic3" };
            context.getBean(Foo.class);
        };
    }

}

class Foo {

    @KafkaListener(id = "#{idProvider.get()}", topics = "#{topicProvider.get()}", groupId = "grp")
    public void listen(String in) {
        System.out.println(in);
    }

}
但是,省略groupId会更好,这样每个容器都在其自己的组中(id属性)。这可避免在添加新容器时进行不必要的重新平衡。

这篇关于我可以在运行时向我的@kafkalistener添加主题吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆