Kafka AvroConsumer使用offsets_for_times从时间戳消耗 [英] Kafka AvroConsumer consume from timestamp using offsets_for_times

查看:327
本文介绍了Kafka AvroConsumer使用offsets_for_times从时间戳消耗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试使用confluent_kafka.AvroConsumer消耗给定时间戳记中的消息.

Trying to use confluent_kafka.AvroConsumer to consume messages from a given time stamp.

if flag:

    # creating a list
    topic_partitons_to_search = list(
        map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))

    print("Searching for offsets with %s" % topic_partitons_to_search)
    offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)
    print("offsets_for_times results: %s" % offsets)

    for x in offsets:
        c.seek(x)
    flag=False 

控制台返回此

Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]
offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]
{'name': 'Hello'}
{'name': 'Hello'}
{'name': 'Hello1'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Offset 8'}
{'name': 'Offset 9'}
{'name': 'Offset 10'}
{'name': 'Offset 11'}
{'name': 'New'} 

这些都是我在my_topic2的分区0中的所有消息(在分区1中什么都没有),我们什么也得不到,因为我们没有从当前时间(time.time())产生的消息.然后,我希望能够使用time.time() - 60000之类的方法来获取最近60000毫秒内的所有消息

These are all my messages in partition 0 of my_topic2 (have nothing in partition 1), we should get nothing back because we have no messages produced from current time (time.time()). I would then like to be able to use something like time.time() - 60000 to get all the messages in the last 60000 miliseconds

推荐答案

Python的time.time()返回自该纪元以来的秒数,offsets_for_times使用距该纪元的毫秒数,因此当我发送时它要计算的日期比今天早得多,这意味着我们应该包括我的所有偏移量.

Pythons time.time() returns the amount of seconds since the epoch, the offsets_for_times uses the amount of milliseconds from the epoch, so when I was sending in amount of seconds it was calculating a date much earlier than today which meant we should include all my offsets.

这篇关于Kafka AvroConsumer使用offsets_for_times从时间戳消耗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆