Kafka:如何根据时间戳消费数据 [英] Kafka: How to consume data based on Timestamp
问题描述
我想知道除了偏移量之外是否还有其他方式来获取时间间隔方面的数据?比如说,我想消耗昨天的所有日期,我该怎么做?
I want know whether there's some way other than offset to fetch data with respect to time interval? Say, I want to consume all the date of yesterday, how do I do it?
推荐答案
使用 offsetsForTimes 以获得与所需时间戳相关的正确偏移量.在 Python 中,它将如下所示:
Use offsetsForTimes to get right offset related to the required timestamp. In Python it will be like next:
from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
topic = "www.kilskil.com"
broker = "localhost:9092"
# lets check messages of the first day in New Year
date_in = datetime(2019,1,1)
date_out = datetime(2019,1,2)
consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll() # we need to read message or call dumb poll before seeking the right position
tp = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0
# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})
consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!
c = 0
for msg in consumer:
if msg.offset >= rec_out[tp].offset:
break
c += 1
# message also has .timestamp field
print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))
不要忘记 Kafka 以毫秒为单位测量时间戳,并且它具有 long 类型.Python lib datetime 以秒为单位返回时间戳,因此我们需要将其乘以 1000.方法 offsets_for_times
返回一个带有 TopicPartition
键和 OffsetAndTimestamp
值的字典.
Don't forget that Kafka measures timestamp in milliseconds and it have long type. Python lib datetime return timestamps in seconds so we need to multiply it by 1000. Method offsets_for_times
returns a dict with TopicPartition
keys and OffsetAndTimestamp
values.
这篇关于Kafka:如何根据时间戳消费数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!