Kafka Consumer:如何从 Python 中的最后一条消息开始消费 [英] Kafka Consumer: How to start consuming from the last message in Python

查看:60
本文介绍了Kafka Consumer:如何从 Python 中的最后一条消息开始消费的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka 0.8.1 和 Kafka python-0.9.0.在我的设置中,我设置了 2 个 kafka 代理.当我运行我的 kafka 消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量.一切都很好!

I am using Kafka 0.8.1 and Kafka python-0.9.0. In my setup, I have 2 kafka brokers setup. When I run my kafka consumer, I can see it retrieving messages from the queue and keeping track of offsets for both the brokers. Everything works great!

我的问题是,当我重新启动消费者时,它会从头开始消费消息.我所期望的是,在重新启动时,消费者会从它死前停止的地方开始消费消息.

My issue is that when I restart the consumer, it starts consuming messages from the beginning. What I was expecting was that upon restart, the consumer would start consuming messages from where it left off before it died.

我确实尝试在 Redis 中跟踪消息偏移量,然后在从队列中读取消息之前调用 consumer.seek 以确保我只获取我以前从未见过的消息.虽然这行得通,但在部署此解决方案之前,我想与大家核实一下……也许我对 Kafka 或 python-Kafka 客户端有一些误解.似乎消费者能够从停止的地方重新开始阅读是非常基本的功能.

I did try keeping track of the message offsets in Redis and then calling consumer.seek before reading a message from the queue to ensure that I was only getting the messages that I hadn't seen before. While this worked, before deploying this solution, I wanted to check with y'all ... perhaps there is something I am misunderstanding about Kafka or the python-Kafka client. Seems like the consumer being able to restart reading from where it left off is pretty basic functionality.

谢谢!

推荐答案

注意 kafka-python 库.它有一些小问题.

Take care with the kafka-python library. It has a few minor issues.

如果速度对您的消费者来说不是真正的问题,您可以在每条消息中设置自动提交.它应该有效.

If speed is not really a problem for your consumer you can set the auto-commit in every message. It should works.

SimpleConsumer 提供了一种 seek 方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185) 允许您开始以任何方式使用消息你想要的点.

SimpleConsumer provides a seek method (https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185) that allows you to start consuming messages in whatever point you want.

最常见的调用是:

  • consumer.seek(0, 0) 从队列的开头开始读取.
  • consumer.seek(0, 1) 从当前偏移量开始读取.
  • consumer.seek(0, 2) 跳过所有待处理的消息并开始仅读取新消息.
  • consumer.seek(0, 0) to start reading from the beginning of the queue.
  • consumer.seek(0, 1) to start reading from current offset.
  • consumer.seek(0, 2) to skip all the pending messages and start reading only new messages.

第一个参数是这些位置的偏移量.这样,如果您调用 consumer.seek(5, 0),您将跳过队列中的前 5 条消息.

The first argument is an offset to those positions. In that way, if you call consumer.seek(5, 0) you will skip the first 5 messages from the queue.

另外,不要忘记,偏移量是为消费者组存储的.确保您一直使用同一个.

Also, don't forget, the offset is stored for consumer groups. Be sure you are using the same one all the time.

这篇关于Kafka Consumer:如何从 Python 中的最后一条消息开始消费的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆