如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗? [英] Can I create an RDD from a kafka topic if I do not know the until offset?

查看:20
本文介绍了如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

KafkaUtils.createRDD 将 offsetRanges 作为参数.我不知道我想要阅读的主题的直到偏移量.我最多想阅读主题中的前 30 条消息.

KafkaUtils.createRDD takes the offsetRanges as a parameter. I do not know the until offset of the topic I want to read from. I want to read at most the first 30 messages in the topic.

我看到有一个 KafkaCluster.html#getLatestLeaderOffsets 但这被注释为开发 API.

I see there is a KafkaCluster.html#getLatestLeaderOffsets but that is annotated as a Develop API.

是否有任何公共方法可以确定主题的最早和最新偏移量?

Is there any public way to determine the earliest and latest offsets for a topic?

推荐答案

事情没那么简单,因为只有个别 broker 知道给定主题/分区的最新偏移信息是什么.

It's not that simple of a thing, because only the individual brokers know what the latest offset info for a given topic / partition is.

您可以执行OffsetRequest.以下将返回主题/分区的最早和最新偏移量(它是 Scala,但如果您不使用 Scala,您应该能够了解这个想法).

You can do an OffsetRequest. The following will return the earliest and latest offsets for a topic / partition (it's Scala, but you should be able to get the idea if you don't use Scala).

请注意,您必须使用一个 SimpleConsumer 连接到作为请求分区的领导者的代理.通常我所做的是,为我的每个经纪人创建一个 SimpleConsumer.然后我做一个元数据请求并获取分区到领导者的映射,然后 foreach 分区我这样做:

Note you have to use a SimpleConsumer connected to the broker that is the leader for the requested partition. Usually what I do is, I create a SimpleConsumer for each of my brokers. Then I do a meta data request and get the partition to leader mapping, then foreach partition I do this:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
  val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
  else (0, -1)
}

希望这会有所帮助.

这篇关于如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆