Spark-无需打开流即可获取Kafka的最早和最新抵销 [英] Spark - Get earliest and latest offset of Kafka without opening stream

查看:66
本文介绍了Spark-无需打开流即可获取Kafka的最早和最新抵销的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前正在使用 spark-streaming-kafka-0-10_2.11 将我的spark应用程序与kafka队列连接.对于Streams,一切正常.但是对于特定情况,我只需要一次kafka队列的全部内容-为此,我得到了更好地使用 KafkaUtils.createRDD (解决方案

在阅读了几次讨论之后,我可以使用:p来获取特定分区的最早或最新偏移量.

  val消费者=新的SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");val topicAndPartition =新的TopicAndPartition(topic,initialPartition)val request = OffsetRequest(Map(topicAndPartition-> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))val offsets = Consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets返回offsets.head 

但是,仍然如何在kafka_consumer.sh CLI命令中复制"from_beginning"的行为仍然是KafkaUtils.createRDD方法所不知道的.

I am currently using spark-streaming-kafka-0-10_2.11 to connect my spark application with the kafka queue. For Streams everything works fine. For a specific scenario however I just need the whole content of the kafka queue exactly once - for this I got the suggestion to better use KafkaUtils.createRDD (SparkStreaming: Read Kafka Stream and provide it as RDD for further processing)

However for spark-streaming-kafka-0-10_2.11 I cannot figure out how to get the earliest and latest offset for my Kafka topic that would be needed to create the Offset-Range I have to hand of the the createRDD method.

What is the recommended way to get those offsets without opening a stream? Any help would be greatly appreciated.

解决方案

After reading several discussions I am able to get the earliest or latest offset from a specific partition with :

val consumer = new SimpleConsumer(host,port,timeout,bufferSize,"offsetfetcher");
val topicAndPartition = new TopicAndPartition(topic, initialPartition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime,1)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

return offsets.head

but still , how to replicate the behaviour of "from_beginning" in a kafka_consumer.sh CLI command is something I do not know by the KafkaUtils.createRDD aproach.

这篇关于Spark-无需打开流即可获取Kafka的最早和最新抵销的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆