如何使用Spring Kafka实现有状态消息监听器? [英] How to implement a stateful message listener using Spring Kafka?

查看:0
本文介绍了如何使用Spring Kafka实现有状态消息监听器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望使用Spring Kafka API实现有状态监听器。

提供以下信息:

  • ConCurrentKafkaListenerContainerFactory,并发设置为"n"
  • Spring@Service类上的@KafkaListener批注方法

然后创建"n"个KafkaMessageListenerContainers。它们中的每一个都将有自己的KafkaConsumer,因此将有"n"个使用者线程-每个使用者一个线程。

消费消息时,将使用轮询底层KafkaConsumer的同一线程调用@KafkaListener方法。由于只有一个监听程序实例,因此此监听程序需要是线程安全的,因为将有来自"n"个线程的并发访问。

我不想考虑并发访问,并在我知道只能由一个线程访问的侦听器中保留状态。

如何使用Spring Kafka API为每个Kafka消费者创建单独的监听器?

推荐答案

您说得对;每个容器都有一个监听器实例(无论是配置为@KafkaListener还是MessageListener)。

一种解决方法是将作用域为MessageListener的原型与n个KafkaMessageListenerContainerBean(每个Bean有一个线程)一起使用。

然后,每个容器将获得其自己的侦听器实例。

@KafkaListenerPOJO抽象不可能做到这一点。

不过,使用无状态Bean通常更好。

编辑

我找到了另一个解决方法,使用SimpleThreadScope...

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}

(容器并发数为3)。

工作正常:

bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2

唯一的问题是作用域不会自动清理(例如,当容器停止并且线程离开时)。这可能并不重要,具体取决于您的用例。

要解决这个问题,我们需要来自容器的一些帮助(例如,在侦听器线程停止时在其上发布一个事件)。GH-762

这篇关于如何使用Spring Kafka实现有状态消息监听器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆