Kafka Python使用者在启动时会读取所有消息 [英] Kafka python consumer reading all the messages when started

查看:422
本文介绍了Kafka Python使用者在启动时会读取所有消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用下面的代码从某个主题中读取消息.我面临两个问题. 每当我启动消费者时,它正在读取队列中的所有消息? 如何只读取未读邮件?

I am using the below code to read messages from a topic. I am facing two issues. Whenever i start consumer, it is reading all the messages in the queue? How do read only the unread messages?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

推荐答案

正如@Kenji所说,您必须使用consumer.commit()提交偏移量.如果您不想手动提交,则可以通过将enable_auto_commit=True传递给KafkaConsumer来启用自动提交.您可能还需要调整auto_commit_interval_ms,它是每次自动提交之间的时间间隔(以毫秒为单位).参见此处: http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html .

As @Kenji said you have to commit the offsets with consumer.commit(). If you don't want to commit manually you can enable autocommit by passing enable_auto_commit=True to your KafkaConsumer. You may also want to tune auto_commit_interval_ms which is the interval in milliseconds between each automatic commit. See here: http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html.

这篇关于Kafka Python使用者在启动时会读取所有消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆