从Kafka检索基于时间戳的数据 [英] Retrieve Timestamp based data from Kafka

查看:226
本文介绍了从Kafka检索基于时间戳的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在指定的日期从Kafka群集中获取消息或数据.例如9月13日,任何人都可以为此提供我代码.我已经用谷歌搜索了,只发现了理论,但是我想要代码

How can I get messages or data from the Kafka cluster for a specified day. For example 13 September, can anyone provide me code for this. I have googled it and found only theory but I want the code

推荐答案

没有为此的访问方法.另外,在Kafka v0.10 消息不包含任何时间戳信息之前,因此,不可能知道何时将消息写入主题.

There is no access method for this. Also, before Kafka v0.10 messages do not contain any timestamp information, thus, it is impossible to know when a message was written into a topic.

从Kafka v0.10 开始,每条消息都包含一个元数据时间戳属性,该属性可以由生产者在消息创建时设置,也可以由代理在消息插入时设置.已计划基于时间的索引,但尚不可用.因此,您需要使用整个主题并检查timestamp字段(并忽略所有您不感兴趣的消息).为了找到起点,您还可以对偏移量和时间戳进行二进制搜索,以更快地找到第一条消息.

As of Kafka v0.10 each message contains a meta data timestamp attribute, that is either set by the producer on message creation time, or by the broker on message insertion time. A time-based index is planned, but not available yet. Thus, you need to consume the whole topic and check the timestamp field (and ignore all messaged you are not interested in). To find the beginning, you could also do a binary search with regard to offsets and timestamps to find the first message faster.

更新:

Kakfa 0.10.1 添加了基于时间的索引.它允许 seek 到第一条记录,该记录的时间戳等于或大于给定的时间戳.您可以通过 KafkaConsumer#offsetsForTime()使用它.这将返回相应的偏移量,您可以将其输入到 KafkaConsumer#seek()中.您可以使用数据并通过 ConsumerRecord#timestamp()检查记录时间戳字段,以查看何时可以停止处理.

Kakfa 0.10.1 adds a time-based index. It allows to seek to the first record with a timestamp equals or larger of the given timestamp. You can use it via KafkaConsumer#offsetsForTime(). This will return the corresponding offsets and you can feed them into KafkaConsumer#seek(). You can just consume the data and check the records timestamp field via ConsumerRecord#timestamp() to see when you can stop processing.

请注意,数据严格按照偏移量进行排序,而不是按时间戳进行排序.因此,在处理过程中,您可能会获得时间戳比开始时间戳的后期"记录(不过,您可以简单地跳过这些记录).

Note, that data is strictly ordered by offsets but not by timestamp. Thus, during processing, you might get "late" records with smaller timestamp than your start timestamp (you could simple skip over those records though).

一个更困难的问题是,在搜索间隔结束时,记录到达较晚.在获得第一个时间戳大于搜索间隔的时间戳之后,以后可能仍会存在带有时间戳的记录,这些记录属于搜索间隔的一部分(如果这些记录确实附加在主题"late"之后).虽然没有办法知道.因此,您可能希望继续阅读更多"记录,并检查是否有最新"记录.某些记录"意味着多少,是您需要自己做出的设计决策.

A more difficult problem is late arriving record at the end of your search interval though. After you got the first timestamp with a larger timestamp than your search interval, there might still be records with timestamp that are part of your search interval later on (if those records did got appended to the topic "late"). There is no way to know that though. Thus, you might want to keep reading "some more" records and check if there are "late" records. How much "some records" means, is a design decision you need to make by yourself.

但是,没有通用的指导原则-如果您对写模式"有更多的了解,它可以帮助您定义一个好的策略,以便在搜索间隔结束"后要消耗多少记录.当然,有两种默认策略:(1)在时间戳大于搜索间隔的第一条记录处停止(并有效地忽略任何迟到的记录-如果您使用日志附加时间"配置,那么这当然是安全的策略);(2)您仔细阅读了日志的末尾-就完整性而言,这是最安全的策略,但可能会导致开销过大(还请注意,由于可以随时附加记录,并且如果记录延迟"可以任意大,到达记录结束后,甚至可能会添加最新记录.

There is not general guideline though -- if you have additional knowledge about your "write pattern" it can help to define a good strategy to how many records you want to consumer after your search interval "ended". Of course there are two default strategies: (1) stop at the very first record with larger timestamp than you search interval (and effectively ignore any late arriving records -- if you use "log append time" configuration this is of course a safe strategy); (2) you read to the end of the log -- this is the safest strategy with regard to completeness but might result in prohibitive overhead (also note, as record can be appended any time and if record "delay" could be arbitrary large, a late record might even be append after you reach end-of-log).

在实践中,考虑最大预期延迟"并阅读直到获得的时间戳比此延迟上限更大时,才是一个好主意.

In practice, it might be a good idea to think about a "max expected delay" and read until you get a record with larger timestamp than this upper delay bound.

这篇关于从Kafka检索基于时间戳的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆