如何获取时间戳上指定的 kafka 偏移数据 [英] How to get kafka offset data, specified on timestamp

查看:38
本文介绍了如何获取时间戳上指定的 kafka 偏移数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试运行它时,我已经尝试根据时间戳从 Kafka 主题获取偏移量,它抛出空指针错误,

I've tried to get the offset from Kafka topic based on timestamp when I tried to run it was throwing null pointer error,

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
              for (TopicPartition partition : partitions) {
                timestampsToSearch.put(partition,  startTimestamp);
              }
Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);
              for (TopicPartition partition : partitions) {
                Long seekOffset = outOffsets.get(partition).offset();
consumer.seek(partition, seekOffset);

任何帮助将不胜感激.

推荐答案

要查找与时间戳对应的偏移量,您需要使用 offsetsForTimes() 方法.

To find the offsets that correspond to a timestamp, you need to use the offsetsForTimes() method.

例如,这将打印 mytopic 的分区 0 对应于 1 秒前的偏移量:

For example, this will print the offsets for partition 0 of mytopic that correspond to 1 second ago:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    Map<TopicPartition, Long> timestamps = new HashMap<>();
    timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
    System.err.println(offsets);
}

这将显示如下内容:

{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}

这篇关于如何获取时间戳上指定的 kafka 偏移数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆