Kafka:如何根据时间戳消费数据 [英] Kafka: How to consume data based on Timestamp

查看:44
本文介绍了Kafka:如何根据时间戳消费数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道除了偏移量之外是否还有其他方式来获取时间间隔方面的数据?比如说,我想消耗昨天的所有日期,我该怎么做?

I want know whether there's some way other than offset to fetch data with respect to time interval? Say, I want to consume all the date of yesterday, how do I do it?

推荐答案

使用 offsetsForTimes 以获得与所需时间戳相关的正确偏移量.在 Python 中,它将如下所示:

Use offsetsForTimes to get right offset related to the required timestamp. In Python it will be like next:

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition

topic  = "www.kilskil.com" 
broker = "localhost:9092"

# lets check messages of the first day in New Year
date_in  = datetime(2019,1,1)
date_out = datetime(2019,1,2)

consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll()  # we need to read message or call dumb poll before seeking the right position

tp      = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0

# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in  = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})

consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!

c = 0
for msg in consumer:
  if msg.offset >= rec_out[tp].offset:
    break

  c += 1
  # message also has .timestamp field

print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))

不要忘记 Kafka 以毫秒为单位测量时间戳,并且它具有 long 类型.Python lib datetime 以秒为单位返回时间戳,因此我们需要将其乘以 1000.方法 offsets_for_times 返回一个带有 TopicPartition 键和 OffsetAndTimestamp 值的字典.

Don't forget that Kafka measures timestamp in milliseconds and it have long type. Python lib datetime return timestamps in seconds so we need to multiply it by 1000. Method offsets_for_times returns a dict with TopicPartition keys and OffsetAndTimestamp values.

这篇关于Kafka:如何根据时间戳消费数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆