Kafka:如何根据时间戳使用数据 [英] Kafka: How to consume data based on Timestamp
问题描述
我想知道是否存在除偏移量以外的其他方式来获取有关时间间隔的数据?说,我要用完昨天的所有日期,该怎么办?
使用 别忘了Kafka以毫秒为单位测量时间戳,它具有 long 类型.Python lib datetime返回时间戳(以秒为单位),因此我们需要将其乘以1000.方法 I want know whether there's some way other than offset to fetch data with respect to time interval? Say, I want to consume all the date of yesterday, how do I do it? Use offsetsForTimes to get right offset related to the required timestamp. In Python it will be like next: Don't forget that Kafka measures timestamp in milliseconds and it have long type. Python lib datetime return timestamps in seconds so we need to multiply it by 1000. Method 这篇关于Kafka:如何根据时间戳使用数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋! offsets_for_times
返回具有 TopicPartition
键和 OffsetAndTimestamp
值的字典./p>from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
topic = "www.kilskil.com"
broker = "localhost:9092"
# lets check messages of the first day in New Year
date_in = datetime(2019,1,1)
date_out = datetime(2019,1,2)
consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll() # we need to read message or call dumb poll before seeking the right position
tp = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0
# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})
consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!
c = 0
for msg in consumer:
if msg.offset >= rec_out[tp].offset:
break
c += 1
# message also has .timestamp field
print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))
offsets_for_times
returns a dict with TopicPartition
keys and OffsetAndTimestamp
values.