如何基于时间戳获取Kafka消息 [英] How to get Kafka messages based on timestamp

查看:745
本文介绍了如何基于时间戳获取Kafka消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个我正在使用kafka且tech是scala的应用程序.我的kafka消费者代码如下:

I am working on a application in which I am using kafka and tech is scala. My kafka consumer code is as follows:

val props = new Properties()
        props.put("group.id", "test")
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("auto.offset.reset", "earliest")
        props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

它给了我所有记录,但问题是我已经在kafka使用者中拥有数据,这可能导致重复数据意味着具有相同密钥的数据已经可以在主题中找到.有什么方法可以检索特定时间的数据.意思是在轮询之前是否可以计算当前时间并仅检索该时间之后的记录.有什么办法可以做到这一点?

It gives me all the records but the thing is I already have data in kafka consumer which may lead to duplicate data means data with same key can already be there in topic. Is there is any way by which I can retrieve data from a particular time. Means before polling if I can calculate current time and retrieve only those records which came after that time. Any way I can achieve this?

推荐答案

从任何给定时间戳消费的唯一方法是

The only way to consume from any given timestamp is to

  1. 查阅
  1. Lookup offsetsForTimes
  2. seek to and commitSync that result
  3. Begin polling

但是,您需要意识到数据流是连续的,以后可能会再次出现重复的键.

But, you need to be conscious that the data stream is continuous and there may again be repeated keys later.

如果您只想查看最新的数据键,则最好使用KTable.

If you have the same key in data, that you would like to only see the latest of, then you'd be better off using a KTable

这篇关于如何基于时间戳获取Kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆