我们应该如何阅读给定时间范围内的 Kafka 主题? [英] How shall we read the Kafka topics in a given time range?

查看:20
本文介绍了我们应该如何阅读给定时间范围内的 Kafka 主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要从 Kafka 主题中读取给定时间范围内的消息.我能想到的解决方案是先找出时间范围开始的最大偏移量,然后继续消费消息,直到所有分区上的偏移量超过时间范围结束.有没有更好的方法来解决这个问题?谢谢!

I need to read the messages in a given time range out of a Kafka topic. The solution that I can think of is to first find out the largest offset for the beginning of the time range, and then continue consume the messages until the offsets on all partitions past the end of the time range. Is there any better approach for solving this problem? Thanks!

推荐答案

嗯,你肯定要先搜索第一个适合时间范围开盘的偏移量.

Well, you definitely have to first search for the first offset which fits the opening of the time range.

这可以使用 KafkaConsumer#offsetsForTimes 方法.

该方法接受一个Map的map,并返回一个Map,其中时间戳在OffsetAndTimestamp 是第一个时间戳等于或大于然后指定的消息.

The method accepts a map of Map<TopicPartition, Long(timestamp)>, and returns a Map<TopicPartition, OffsetAndTimestamp> where the timestamp in OffsetAndTimestamp is of the first message with timestamp Equal to or greater then the one specified.

从那里,您可以将您的使用者分配给返回的偏移量,并进行迭代,直到记录中的时间戳超过您的时间范围结束.

From there, you can assign your consumer to the offset returned, and iterate until the timestamp in the record exceeds the end of your time range.

一些伪代码:

static void main(String[] args) {
    String topic = args[1];
    long timestampBeginning = Long.parseLong(args[2]);
    long timestampEnd = Long.parseLong(args[3]);
    TopicPartition partition = new TopicPartition(topic, 0);

    Consumer<Object, Object> consumer = createConsumer();

    long beginningOffset = consumer.offsetsForTimes(
            Collections.singletonMap(partition, timestampBeginning))
                    .get(partition).offset();

    consumer.assign(Collections.singleton(partition)); // must assign before seeking
    consumer.seek(partition, beginningOffset);

    for (ConsumerRecord<Object, Object> record : consumer.poll()) {
        if (record.timestamp() > timestampEnd) {
            break; // or whatever
        }

        // handle record
    }
}

这篇关于我们应该如何阅读给定时间范围内的 Kafka 主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆