动态启动和关闭 KafkaListener 只是为了在会话开始时加载以前的消息 [英] Dynamically start and off KafkaListener just to load previous messages at the start of a session

查看:106
本文介绍了动态启动和关闭 KafkaListener 只是为了在会话开始时加载以前的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用于 kafkalistener 的工作代码,用于从主题的开头(offset=0)(始终运行)读取消息.对于我的用例(消息传递),我需要两件事:

I have a working code for kafkalistener to read messages from the beginning(offset=0) of a topic (always running). For my use case (messaging) I need 2 things:

  1. 总是捕捉特定主题/分区的新消息(这个消费者总是在运行)并发送到前端 websocket+stomp.(这部分我已经有了)

  1. always catch new messages(this consumer is always running) of specific topic/partition and send to frontend websocket+stomp. (I already have this part)

启动新的消费者以获取从特定主题/分区的开始到当前的消息,只有在前端发出信号时才停止,以便可以获取这些数据(为迟到的用户或稍后加载以前的消息)通过前端 websocket+stomp(在会话开始时)

start new consumer to get messages from beginning to current of specific topic/partition, only when frontend signals and then stop after that so that these data(loading previous messages for the late user or for later) can be fetched by frontend websocket+stomp (at the beginning of its session)

如果我可以动态地(在从前端获取信号后)添加/删除带有参数的 kafkaListener(来自 post 方法的数据),它将同时提供两个服务

实际上,我该如何实现?我应该考虑使用 post 方法通知后端我现在需要加载此主题/分区的先前消息并将其发送到此.."网址吗?但是,如何在不一直运行的情况下动态启动和关闭该消费者(kafkaListener)并在那里传递参数?

actually, how can I implement this? should I think of using post method to notify backend that I need to load previous messages of this topic/partition right now and send those it to this ".." url? but then how can I dynamically start and off that consumer(kafkaListener) without running all the time and pass parameter there?

推荐答案

这是一个快速的 Spring Boot 应用程序,展示了如何动态创建容器.

Here is a quick Spring Boot application showing how to dynamically create containers.

@SpringBootApplication
public class So61950229Application {

    public static void main(String[] args) {
        SpringApplication.run(So61950229Application.class, args);
    }


    @Bean
    public ApplicationRunner runner(DynamicListener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so61950229", "foo" + i));
            System.out.println("Hit enter to start a listener");
            System.in.read();
            listener.newContainer("so61950229", 0);
            System.out.println("Hit enter to start another listener");
            System.in.read();
            listener.newContainer("so61950229", 0);
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61950229").partitions(1).replicas(1).build();
    }

}

@Component
class DynamicListener {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicListener.class);

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers
            = new ConcurrentHashMap<>();

    DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    void newContainer(String topic, int partition) {
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topic, partition));
        String groupId = UUID.randomUUID().toString();
        container.getContainerProperties().setGroupId(groupId);
        container.setupMessageListener((MessageListener) record -> {
            System.out.println(record);
        });
        this.containers.put(groupId, container);
        container.start();
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        AbstractMessageListenerContainer<String, String> container = this.containers.remove(
                event.getContainer(ConcurrentMessageListenerContainer.class).getContainerProperties().getGroupId());
        if (container != null) {
            LOG.info("Stopping idle container");
            container.stop(() -> LOG.info("Stopped"));
        }
    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000

这篇关于动态启动和关闭 KafkaListener 只是为了在会话开始时加载以前的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆