我可以从卡夫卡的主题创建RDD如果我不知道,直到偏移? [英] Can I create an RDD from a kafka topic if I do not know the until offset?

查看:400
本文介绍了我可以从卡夫卡的主题创建RDD如果我不知道,直到偏移?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

<一个href=\"https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html#createRDD(org.apache.spark.SparkContext,%20scala.collection.immutable.Map,%20org.apache.spark.streaming.kafka.OffsetRange[],%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag)\"相对=nofollow> KafkaUtils.createRDD 取offsetRanges作为参数。我不知道,直到我想从阅读的主题偏移。我想在主题中最前30的消息阅读。

KafkaUtils.createRDD takes the offsetRanges as a parameter. I do not know the until offset of the topic I want to read from. I want to read at most the first 30 messages in the topic.

我看到有一个<一个href=\"https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/streaming/kafka/KafkaCluster.html#getLatestLeaderOffsets(scala.collection.immutable.Set)\"相对=nofollow> KafkaCluster.html#getLatestLeaderOffsets 但被注释为一个开发API。

I see there is a KafkaCluster.html#getLatestLeaderOffsets but that is annotated as a Develop API.

有没有公开的方式确定一个主题的最早和最晚的偏移?

Is there any public way to determine the earliest and latest offsets for a topic?

推荐答案

这不是那么简单的事情,因为只有个别经纪知道一个给定的主题/分区的最新信息偏移什么。

It's not that simple of a thing, because only the individual brokers know what the latest offset info for a given topic / partition is.

您可以做一个 OffsetRequest 。下面将返回的最早和最晚偏移量话题/分区(这是斯卡拉,但你应该能够得到的想法,如果你不使用斯卡拉)。

You can do an OffsetRequest. The following will return the earliest and latest offsets for a topic / partition (it's Scala, but you should be able to get the idea if you don't use Scala).

请注意,你必须使用 SimpleConsumer 连接到是请求分区领头羊的经纪人。通常我做的是,我创建了一个 SimpleConsumer 我的每个经纪人。然后我做了元数据的请求,并得到分区领头羊的映射,那么的foreach分区我这样做:

Note you have to use a SimpleConsumer connected to the broker that is the leader for the requested partition. Usually what I do is, I create a SimpleConsumer for each of my brokers. Then I do a meta data request and get the partition to leader mapping, then foreach partition I do this:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo]((new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)))
  val req = new kafka.javaapi.OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion, "offReq")
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  if (offsets.size > 0) (offsets(offsets.size - 1), offsets(0))
  else (0, -1)
}

希望这有助于。

这篇关于我可以从卡夫卡的主题创建RDD如果我不知道,直到偏移?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆