kafka-python-如何提交分区? [英] kafka-python - How do I commit a partition?
问题描述
使用kafka-python-1.0.2.
Using kafka-python-1.0.2.
如果我有一个包含10个分区的主题,那么如何在循环遍历各个分区和消息的同时提交特定的分区.我只是似乎在任何地方(无论是在文档中还是在其他地方)都找不到此示例
If I have a topic with 10 partitions, how do I go about committing a particular partition, while looping through the various partitions and messages. I just cant seem find an example of this anywhere, in the docs or otherwise
从文档中,我想使用:
consumer.commit(offset=offsets)
具体来说,如何创建偏移量所需的分区和OffsetAndMetadata字典(字典,可选)– {TopicPartition:OffsetAndMetadata}.
Specifically, how do I create the partition and OffsetAndMetadata dictionary required for offsets (dict, optional) – {TopicPartition: OffsetAndMetadata}.
我希望函数调用就像这样:
I was hoping the function call would just be something like:
consumer.commit(partition, offset)
但事实并非如此.
谢谢.
推荐答案
所以看来我可能已经想通了,很有趣,当您写下问题时会如何发生.这似乎可行:
So it looks like I may have figured it out, funny how that happens when you write down your questions. This seems to work:
meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
需要进行更多测试,但是如果有任何更改,它将进行更新.
More testing is needed, but will update if anything changes.
这篇关于kafka-python-如何提交分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!