如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗? [英] Can I create an RDD from a kafka topic if I do not know the until offset?
问题描述
KafkaUtils.createRDD 将 offsetRanges 作为参数.我不知道我想要阅读的主题的直到偏移量.我最多想阅读主题中的前 30 条消息.
KafkaUtils.createRDD takes the offsetRanges as a parameter. I do not know the until offset of the topic I want to read from. I want to read at most the first 30 messages in the topic.
我看到有一个 KafkaCluster.html#getLatestLeaderOffsets 但这被注释为开发 API.
I see there is a KafkaCluster.html#getLatestLeaderOffsets but that is annotated as a Develop API.
是否有任何公共方法可以确定主题的最早和最新偏移量?
Is there any public way to determine the earliest and latest offsets for a topic?
推荐答案
事情没那么简单,因为只有个别 broker 知道给定主题/分区的最新偏移信息是什么.
It's not that simple of a thing, because only the individual brokers know what the latest offset info for a given topic / partition is.
您可以执行OffsetRequest
.以下将返回主题/分区的最早和最新偏移量(它是 Scala,但如果您不使用 Scala,您应该能够了解这个想法).
You can do an OffsetRequest
. The following will return the earliest and latest offsets for a topic / partition (it's Scala, but you should be able to get the idea if you don't use Scala).
请注意,您必须使用一个 SimpleConsumer
连接到作为请求分区的领导者的代理.通常我所做的是,为我的每个经纪人创建一个 SimpleConsumer
.然后我做一个元数据请求并获取分区到领导者的映射,然后 foreach 分区我这样做:
Note you have to use a SimpleConsumer
connected to the broker that is the leader for the requested partition. Usually what I do is, I create a SimpleConsumer
for each of my brokers. Then I do a meta data request and get the partition to leader mapping, then foreach partition I do this:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
else (0, -1)
}
希望这会有所帮助.
这篇关于如果我不知道直到偏移量,我可以从 kafka 主题创建 RDD 吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!