在读取所有消息直到某个特定时间后停止 KafkaListener(Spring Kafka Consumer) [英] Stop KafkaListener ( Spring Kafka Consumer) after it has read all messages till some specific time

查看:21
本文介绍了在读取所有消息直到某个特定时间后停止 KafkaListener(Spring Kafka Consumer)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从单个分区主题安排我的消费过程.我可以使用 endpointlistenerregistry.start() 启动它,但我想在消耗了当前分区中的所有消息后停止它,即当我到达当前分区中的最后一个偏移量时.在我完成消费并关闭它之后,生产进入主题.我应该如何确保我已经阅读了所有消息,直到我启动调度程序并停止我的消费者?我正在为消费者使用 @Kafkalistener.

I am trying to schedule my consumption process from a single partition topic. I can start it using endpointlistenerregistry.start() but I want to stop it after I have consumed all the messages in current partition i.e. when I reach to last offset in current partition. Production into the topic is done after I have finished the consumption and close it. How should I achieve the assurance that I have read all the messages till the time I started scheduler and stop my consumer ? I am using @Kafkalistener for consumer.

推荐答案

设置 idleEventInterval 容器属性并添加一个 @EventListener 方法来监听 ListenerContainerIdleEvents.

Set the idleEventInterval container property and add an @EventListener method to listen for ListenerContainerIdleEvents.

然后停止容器.

这篇关于在读取所有消息直到某个特定时间后停止 KafkaListener(Spring Kafka Consumer)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆