Kafka Consumer:如何从 Python 中的最后一条消息开始消费 [英] Kafka Consumer: How to start consuming from the last message in Python
问题描述
我正在使用 Kafka 0.8.1 和 Kafka python-0.9.0.在我的设置中,我设置了 2 个 kafka 代理.当我运行我的 kafka 消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量.一切都很好!
I am using Kafka 0.8.1 and Kafka python-0.9.0. In my setup, I have 2 kafka brokers setup. When I run my kafka consumer, I can see it retrieving messages from the queue and keeping track of offsets for both the brokers. Everything works great!
我的问题是,当我重新启动消费者时,它会从头开始消费消息.我所期望的是,在重新启动时,消费者会从它死前停止的地方开始消费消息.
My issue is that when I restart the consumer, it starts consuming messages from the beginning. What I was expecting was that upon restart, the consumer would start consuming messages from where it left off before it died.
我确实尝试在 Redis 中跟踪消息偏移量,然后在从队列中读取消息之前调用 consumer.seek 以确保我只获取我以前从未见过的消息.虽然这行得通,但在部署此解决方案之前,我想与大家核实一下……也许我对 Kafka 或 python-Kafka 客户端有一些误解.似乎消费者能够从停止的地方重新开始阅读是非常基本的功能.
I did try keeping track of the message offsets in Redis and then calling consumer.seek before reading a message from the queue to ensure that I was only getting the messages that I hadn't seen before. While this worked, before deploying this solution, I wanted to check with y'all ... perhaps there is something I am misunderstanding about Kafka or the python-Kafka client. Seems like the consumer being able to restart reading from where it left off is pretty basic functionality.
谢谢!
推荐答案
注意 kafka-python 库.它有一些小问题.
Take care with the kafka-python library. It has a few minor issues.
如果速度对您的消费者来说不是真正的问题,您可以在每条消息中设置自动提交.它应该有效.
If speed is not really a problem for your consumer you can set the auto-commit in every message. It should works.
SimpleConsumer 提供了一种 seek
方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185) 允许您开始以任何方式使用消息你想要的点.
SimpleConsumer provides a seek
method (https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185) that allows you to start consuming messages in whatever point you want.
最常见的调用是:
consumer.seek(0, 0)
从队列的开头开始读取.consumer.seek(0, 1)
从当前偏移量开始读取.consumer.seek(0, 2)
跳过所有待处理的消息并开始仅读取新消息.
consumer.seek(0, 0)
to start reading from the beginning of the queue.consumer.seek(0, 1)
to start reading from current offset.consumer.seek(0, 2)
to skip all the pending messages and start reading only new messages.
第一个参数是这些位置的偏移量.这样,如果您调用 consumer.seek(5, 0)
,您将跳过队列中的前 5 条消息.
The first argument is an offset to those positions. In that way, if you call consumer.seek(5, 0)
you will skip the first 5 messages from the queue.
另外,不要忘记,偏移量是为消费者组存储的.确保您一直使用同一个.
Also, don't forget, the offset is stored for consumer groups. Be sure you are using the same one all the time.
这篇关于Kafka Consumer:如何从 Python 中的最后一条消息开始消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!