kafka 和 Spark:通过 API 获取主题的第一个偏移量 [英] kafka and Spark: Get first offset of a topic via API
问题描述
我正在使用 Spark Streaming 和 Kafka(使用 Scala API),并且想使用 Spark Streaming 从一组 Kafka 主题中读取消息.
I am playing with Spark Streaming and Kafka (with the Scala API), and would like to read message from a set of Kafka topics with Spark Streaming.
以下方法:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
从 Kafka 读取到最新的可用偏移量,但没有提供我需要的元数据(因为我正在从一组主题中读取,所以我需要读取该主题的每条消息),但是另一种方法 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)
明确想要一个我没有的偏移量.
reads from Kafka to the latest available offset, but doesn't give me the metadata that I need (since I am reading from a set of topics, I need for every message I read that topic) but this other method KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)
wants explicitly an offset that I don't have.
我知道有这个 shell 命令可以给你最后的偏移量.
I know that there is this shell command that gives you the last offset.
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
和 KafkaCluster.scala
是一个面向开发人员的 API,过去是公开的,并为您提供我想要的.
and KafkaCluster.scala
is an API that is for developers that used to be public and gives you exactly what I would like.
提示?
推荐答案
您可以使用 GetOffsetShell.scala 中的代码 kafka API 文档
You can use the code from GetOffsetShell.scala kafka API documentation
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
或者您可以创建具有唯一 groupId 的新消费者并将其用于获取第一个偏移量
Or you can create new consumer with unique groupId and use it for getting first offset
val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList))
consumer.partitionsFor(config.topic).foreach(pi => {
val topicPartition = new TopicPartition(pi.topic(), pi.partition())
consumer.assign(List(topicPartition))
consumer.seekToBeginning()
val firstOffset = consumer.position(topicPartition)
...
这篇关于kafka 和 Spark:通过 API 获取主题的第一个偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!