kafka 和 Spark:通过 API 获取主题的第一个偏移量 [英] kafka and Spark: Get first offset of a topic via API

查看:31
本文介绍了kafka 和 Spark:通过 API 获取主题的第一个偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark Streaming 和 Kafka(使用 Scala API),并且想使用 Spark Streaming 从一组 Kafka 主题中读取消息.

I am playing with Spark Streaming and Kafka (with the Scala API), and would like to read message from a set of Kafka topics with Spark Streaming.

以下方法:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

从 Kafka 读取到最新的可用偏移量,但没有提供我需要的元数据(因为我正在从一组主题中读取,所以我需要读取该主题的每条消息),但是另一种方法 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler) 明确想要一个我没有的偏移量.

reads from Kafka to the latest available offset, but doesn't give me the metadata that I need (since I am reading from a set of topics, I need for every message I read that topic) but this other method KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler) wants explicitly an offset that I don't have.

我知道有这个 shell 命令可以给你最后的偏移量.

I know that there is this shell command that gives you the last offset.

kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 

KafkaCluster.scala 是一个面向开发人员的 API,过去是公开的,并为您提供我想要的.

and KafkaCluster.scala is an API that is for developers that used to be public and gives you exactly what I would like.

提示?

推荐答案

您可以使用 GetOffsetShell.scala 中的代码 kafka API 文档

You can use the code from GetOffsetShell.scala kafka API documentation

val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

或者您可以创建具有唯一 groupId 的新消费者并将其用于获取第一个偏移量

Or you can create new consumer with unique groupId and use it for getting first offset

val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList))
consumer.partitionsFor(config.topic).foreach(pi => {
      val topicPartition = new TopicPartition(pi.topic(), pi.partition())

      consumer.assign(List(topicPartition))
      consumer.seekToBeginning()
      val firstOffset = consumer.position(topicPartition)
 ...

这篇关于kafka 和 Spark:通过 API 获取主题的第一个偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆