消费者重新启动后,kafka-python 从最后生成的消息中读取 [英] kafka-python read from last produced message after a consumer restart

查看:34
本文介绍了消费者重新启动后,kafka-python 从最后生成的消息中读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 kafka-python 来使用来自 kafka 队列的消息(kafka 0.10 版).2.0).特别是我使用 KafkaConsumer 类型.如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息.我怎样才能做到这一点?

i am using kafka-python to consume messages from a kafka queue (kafka version 0.10.2.0). In particular i am using KafkaConsumer type. If the consumer stops and after a while it is restarted i would like to restart from the latest produced message, that is drop all the messages produced during the time the consumer was down. How can i achieve this?

谢谢

推荐答案

你不会seekToEnd() 到日志的末尾.

You will not to seekToEnd() to the end of the log.

请记住,您首先需要订阅主题,然后才能搜索.此外,订阅是懒惰的.因此,您还需要先添加虚拟投票",然后才能进行搜索.

Keep in mind, that you first need to subscribe to a topic before you can seek. Also, subscribing is lazy. Thus, you will need to add a "dummy poll" before you can seek, too.

consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()

// now enter your regular poll-loop

这篇关于消费者重新启动后,kafka-python 从最后生成的消息中读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆