Kafka Consumer:如何从Python中的最后一条消息开始使用 [英] Kafka Consumer: How to start consuming from the last message in Python

查看:875
本文介绍了Kafka Consumer:如何从Python中的最后一条消息开始使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka 0.8.1和Kafka python-0.9.0.在我的设置中,我有2个kafka经纪人设置.当我运行我的kafka使用者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量.一切都很好!

I am using Kafka 0.8.1 and Kafka python-0.9.0. In my setup, I have 2 kafka brokers setup. When I run my kafka consumer, I can see it retrieving messages from the queue and keeping track of offsets for both the brokers. Everything works great!

我的问题是,当我重新启动使用者时,它从头开始使用消息.我期望的是,重新启动后,使用者将开始使用从死亡之前中断的地方接收消息.

My issue is that when I restart the consumer, it starts consuming messages from the beginning. What I was expecting was that upon restart, the consumer would start consuming messages from where it left off before it died.

我确实尝试跟踪Redis中的消息偏移量,然后在从队列中读取消息之前调用consumer.seek,以确保我只会得到以前从未见过的消息.在此解决方案起作用的同时,在部署此解决方案之前,我想与所有人进行检查……也许我对Kafka或python-Kafka客户端有误解.消费者似乎可以从上次中断的地方重新开始阅读,这是非常基本的功能.

I did try keeping track of the message offsets in Redis and then calling consumer.seek before reading a message from the queue to ensure that I was only getting the messages that I hadn't seen before. While this worked, before deploying this solution, I wanted to check with y'all ... perhaps there is something I am misunderstanding about Kafka or the python-Kafka client. Seems like the consumer being able to restart reading from where it left off is pretty basic functionality.

谢谢!

推荐答案

请注意kafka-python库.它有一些小问题.

Take care with the kafka-python library. It has a few minor issues.

如果速度对于您的消费者而言并不是真正的问题,则可以在每条消息中设置自动提交.应该可以.

If speed is not really a problem for your consumer you can set the auto-commit in every message. It should works.

SimpleConsumer提供了seek方法(

SimpleConsumer provides a seek method (https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185) that allows you to start consuming messages in whatever point you want.

最常用的电话是:

  • consumer.seek(0, 0)从队列的开头开始读取.
  • consumer.seek(0, 1)开始从当前偏移量开始读取.
  • consumer.seek(0, 2)跳过所有待处理的邮件,仅开始阅读新邮件.
  • consumer.seek(0, 0) to start reading from the beginning of the queue.
  • consumer.seek(0, 1) to start reading from current offset.
  • consumer.seek(0, 2) to skip all the pending messages and start reading only new messages.

第一个参数是这些位置的偏移量.这样,如果您呼叫consumer.seek(5, 0),您将跳过队列中的前5条消息.

The first argument is an offset to those positions. In that way, if you call consumer.seek(5, 0) you will skip the first 5 messages from the queue.

此外,请不要忘记,偏移量是为消费者组存储的.确保您一直在使用相同的计算机.

Also, don't forget, the offset is stored for consumer groups. Be sure you are using the same one all the time.

这篇关于Kafka Consumer:如何从Python中的最后一条消息开始使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆