如何使用 spring-kafka 暂停和恢复@KafkaListener [英] how to pause and resume @KafkaListener using spring-kafka

查看:123
本文介绍了如何使用 spring-kafka 暂停和恢复@KafkaListener的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经实现了 Kafka 消费者,现在我有了一个场景.

I have implemented the Kafka consumer, now I have a scenario.

  1. 从 Kafka 流 2.2.5.Release 通过 Srpingboot 读取数据
  2. 加载到数据库表1
  3. 将数据从table1复制到table2
  4. 清空表格1

要执行上述操作,我需要使用石英的调度作业(已编写)暂停/恢复 Kafka 使用者,该作业将数据从表 1 复制到表 2.但在此活动期间,我希望我的 Kafka 侦听器暂停,一旦复制完成,它应该继续.

To do the above things, I need to pause/resume the Kafka consumer using a scheduling job(already written) using quartz, which copies data from table 1 to table 2. But during this activity, I want my Kafka listener to pause, and once the copy is done, it should resume.

我的实现:

@KafkaListener(topicPartitions =
    { @TopicPartition(topic = "data_pipe", partitions = { "0" })})
public void listen(ConsumerRecord<String, String> cr) throws Exception {

推荐答案

如果你使用 'kafkaListener annotation' 自动创建的 KafkaListenerEndpointRegistry bean,那么,你可以像这样使用它:

if you use 'kafkaListener annotation' auto created KafkaListenerEndpointRegistry bean, so, You can use it like this code:

@Component
public class KafkaManager {

    private final KafkaListenerEndpointRegistry registry;

    public KafkaManager(KafkaListenerEndpointRegistry registry) {
        this.registry = registry;
    }
    public void pause() {
        registry.getListenerContainers().forEach(MessageListenerContainer::pause);
    }

    public void resume() {
        registry.getListenerContainers().forEach(MessageListenerContainer::resume);
    }
}

文档:https://docs.spring.io/spring-kafka/reference/html/#pause-resume

这篇关于如何使用 spring-kafka 暂停和恢复@KafkaListener的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆