如何暂停卡夫卡消费者? [英] How to pause a kafka consumer?

查看:287
本文介绍了如何暂停卡夫卡消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的框架中使用Kafka生产者 - 消费者模型。消费者端消耗的记录随后被索引到elasticsearch上。在这里我有一个用例,如果ES关闭,我将不得不暂停kafka消费者直到ES启动,一旦启动,我需要恢复消费者并消耗我上次离开的记录。
我不认为这可以用@KafkaListener实现。有谁能请给我一个解决方案吗?我想我需要为此编写自己的KafkaListenerContainer,但我无法正确实现它。任何帮助都将非常感激。

I am using a Kafka producer - consumer model in my framework. The record consumed at the consumer end is later indexed onto the elasticsearch. Here i have a use case where if the ES is down, I will have to pause the kafka consumer until the ES is up, Once it is up, I need to resume the consumer and consume the record from where I last left. I don't think this can be achieved with @KafkaListener. Can anyone please give me a solution for this? I figured out that I need to write my own KafkaListenerContainer for this, but I am not able to implement it correctly. Any help would be much appreciated.

推荐答案

我建议相当暂停消费者,为什么你不能再次重试同样的消息一旦消息成功使用,再次提交偏移量。

I would suggest rather pausing consumer , why can't you retry the same message again and again and commit offset once message is consumed successfully.

例如:

使用<$ c注释方法$ c> @Retryable

使用try / catch阻止你的方法并在catch块中抛出新异常。

And block your method with try/catch and throw new exception in catch block.

对于ListenerFactory配置,添加属性:

For ListenerFactory configuration add property:

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);

这篇关于如何暂停卡夫卡消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆