Kafka:如何根据时间戳使用数据 [英] Kafka: How to consume data based on Timestamp

查看:103
本文介绍了Kafka:如何根据时间戳使用数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否存在除偏移量以外的其他方式来获取有关时间间隔的数据?说,我要用完昨天的所有日期,该怎么办?

解决方案

使用

别忘了Kafka以毫秒为单位测量时间戳,它具有 long 类型.Python lib datetime返回时间戳(以秒为单位),因此我们需要将其乘以1000.方法 offsets_for_times 返回具有 TopicPartition 键和 OffsetAndTimestamp 值的字典./p>

I want know whether there's some way other than offset to fetch data with respect to time interval? Say, I want to consume all the date of yesterday, how do I do it?

Use offsetsForTimes to get right offset related to the required timestamp. In Python it will be like next:

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition

topic  = "www.kilskil.com" 
broker = "localhost:9092"

# lets check messages of the first day in New Year
date_in  = datetime(2019,1,1)
date_out = datetime(2019,1,2)

consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll()  # we need to read message or call dumb poll before seeking the right position

tp      = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0

# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in  = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})

consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!

c = 0
for msg in consumer:
  if msg.offset >= rec_out[tp].offset:
    break

  c += 1
  # message also has .timestamp field

print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))

Don't forget that Kafka measures timestamp in milliseconds and it have long type. Python lib datetime return timestamps in seconds so we need to multiply it by 1000. Method offsets_for_times returns a dict with TopicPartition keys and OffsetAndTimestamp values.

这篇关于Kafka:如何根据时间戳使用数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆