Spark-无需打开流即可获取Kafka的最早和最新抵销 [英] Spark - Get earliest and latest offset of Kafka without opening stream
问题描述
我当前正在使用 spark-streaming-kafka-0-10_2.11
将我的spark应用程序与kafka队列连接.对于Streams,一切正常.但是对于特定情况,我只需要一次kafka队列的全部内容-为此,我得到了更好地使用 KafkaUtils.createRDD
(解决方案
在阅读了几次讨论之后,我可以使用:p来获取特定分区的最早或最新偏移量.
val消费者=新的SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");val topicAndPartition =新的TopicAndPartition(topic,initialPartition)val request = OffsetRequest(Map(topicAndPartition-> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))val offsets = Consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets返回offsets.head
但是,仍然如何在kafka_consumer.sh CLI命令中复制"from_beginning"的行为仍然是KafkaUtils.createRDD方法所不知道的.
I am currently using spark-streaming-kafka-0-10_2.11
to connect my spark application with the kafka queue. For Streams everything works fine. For a specific scenario however I just need the whole content of the kafka queue exactly once - for this I got the suggestion to better use KafkaUtils.createRDD
(SparkStreaming: Read Kafka Stream and provide it as RDD for further processing)
However for spark-streaming-kafka-0-10_2.11
I cannot figure out how to get the earliest and latest offset for my Kafka topic that would be needed to create the Offset-Range I have to hand of the the createRDD
method.
What is the recommended way to get those offsets without opening a stream? Any help would be greatly appreciated.
After reading several discussions I am able to get the earliest or latest offset from a specific partition with :
val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
return offsets.head
but still , how to replicate the behaviour of "from_beginning" in a kafka_consumer.sh CLI command is something I do not know by the KafkaUtils.createRDD aproach.
这篇关于Spark-无需打开流即可获取Kafka的最早和最新抵销的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!