如何暂停 kafka 消费者? [英] How to pause a kafka consumer?

查看:350
本文介绍了如何暂停 kafka 消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的框架中使用了 Kafka 生产者 - 消费者模型.在消费者端消费的记录稍后会被索引到 elasticsearch 上.这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者直到 ES 启动,一旦它启动,我需要恢复消费者并从我上次离开的地方消费记录.我认为@KafkaListener 无法实现这一点.任何人都可以请给我一个解决方案吗?我发现我需要为此编写自己的 KafkaListenerContainer,但我无法正确实现它.任何帮助将不胜感激.

I am using a Kafka producer - consumer model in my framework. The record consumed at the consumer end is later indexed onto the elasticsearch. Here i have a use case where if the ES is down, I will have to pause the kafka consumer until the ES is up, Once it is up, I need to resume the consumer and consume the record from where I last left. I don't think this can be achieved with @KafkaListener. Can anyone please give me a solution for this? I figured out that I need to write my own KafkaListenerContainer for this, but I am not able to implement it correctly. Any help would be much appreciated.

推荐答案

我建议暂停 consumer ,为什么你不能一次又一次地重试相同的消息并在消息消费成功后提交偏移量.

I would suggest rather pausing consumer , why can't you retry the same message again and again and commit offset once message is consumed successfully.

例如:

@Retryable 注释你的方法

并使用 try/catch 阻止您的方法并在 catch 块中抛出新异常.

And block your method with try/catch and throw new exception in catch block.

为 ListenerFactory 配置添加属性:

For ListenerFactory configuration add property:

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);

这篇关于如何暂停 kafka 消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆