我可以从卡夫卡的主题创建RDD如果我不知道,直到偏移? [英] Can I create an RDD from a kafka topic if I do not know the until offset?
问题描述
<一个href=\"https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html#createRDD(org.apache.spark.SparkContext,%20scala.collection.immutable.Map,%20org.apache.spark.streaming.kafka.OffsetRange[],%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag)\"相对=nofollow> KafkaUtils.createRDD 取offsetRanges作为参数。我不知道,直到我想从阅读的主题偏移。我想在主题中最前30的消息阅读。
KafkaUtils.createRDD takes the offsetRanges as a parameter. I do not know the until offset of the topic I want to read from. I want to read at most the first 30 messages in the topic.
我看到有一个<一个href=\"https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/streaming/kafka/KafkaCluster.html#getLatestLeaderOffsets(scala.collection.immutable.Set)\"相对=nofollow> KafkaCluster.html#getLatestLeaderOffsets 但被注释为一个开发API。
I see there is a KafkaCluster.html#getLatestLeaderOffsets but that is annotated as a Develop API.
有没有公开的方式确定一个主题的最早和最晚的偏移?
Is there any public way to determine the earliest and latest offsets for a topic?
推荐答案
这不是那么简单的事情,因为只有个别经纪知道一个给定的主题/分区的最新信息偏移什么。
It's not that simple of a thing, because only the individual brokers know what the latest offset info for a given topic / partition is.
您可以做一个 OffsetRequest
。下面将返回的最早和最晚偏移量话题/分区(这是斯卡拉,但你应该能够得到的想法,如果你不使用斯卡拉)。
You can do an OffsetRequest
. The following will return the earliest and latest offsets for a topic / partition (it's Scala, but you should be able to get the idea if you don't use Scala).
请注意,你必须使用 SimpleConsumer
连接到是请求分区领头羊的经纪人。通常我做的是,我创建了一个 SimpleConsumer
我的每个经纪人。然后我做了元数据的请求,并得到分区领头羊的映射,那么的foreach分区我这样做:
Note you have to use a SimpleConsumer
connected to the broker that is the leader for the requested partition. Usually what I do is, I create a SimpleConsumer
for each of my brokers. Then I do a meta data request and get the partition to leader mapping, then foreach partition I do this:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
else (0, -1)
}
希望这有助于。
这篇关于我可以从卡夫卡的主题创建RDD如果我不知道,直到偏移?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!