Kafka spark directStream 无法获取数据 [英] Kafka spark directStream can not get data

查看:19
本文介绍了Kafka spark directStream 无法获取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spark directStream api 从 Kafka 读取数据.我的代码如下:

I'm using spark directStream api to read data from Kafka. My code as following please:

val sparkConf = new SparkConf().setAppName("testdirectStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

val kafkaParams = Map[String, String](
    "auto.offset.reset" -> "smallest",
    "metadata.broker.list"->"10.0.0.11:9092",
    "spark.streaming.kafka.maxRatePerPartition"->"100"
)
//I set all of the 3 partitions fromOffset are 0
var fromOffsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("mytopic",0) -> 0)
fromOffsets+=(TopicAndPartition("mytopic",1) -> 0)
fromOffsets+=(TopicAndPartition("mytopic",2) -> 0)

val kafkaData = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc, kafkaParams, fromOffsets,(mmd: MessageAndMetadata[String, String]) => mmd)

var offsetRanges = Array[OffsetRange]()
kafkaData.transform { rdd =>
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
}.map {
    _.message()
}.foreachRDD { rdd =>
    for (o <- offsetRanges) {
        println(s"---${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
    }
    rdd.foreachPartition{ partitionOfRecords =>
        partitionOfRecords.foreach { line =>
            println("===============value:"+line)
        }
    }
}

我确定kafka集群中有数据,但是我的代码无法获取任何数据.提前致谢.

I'm sure there are data in the kafka cluster, but my code could not get any of them. Thanks in advance.

推荐答案

我找到原因了:kafka 中的旧消息由于保留期已过而已被删除.因此,当我将 fromOffset 设置为 0 时,它会导致 OutOfOffSet 异常.该异常导致 Spark 使用最新的偏移量重置偏移量.因此我无法收到任何消息.解决方案是我需要设置适当的 fromOffset 以避免异常.

I found the reason: The old messages in kafka have already been deleted since the retention period expired. So when I set the fromOffset is 0 it caused OutOfOffSet exception. The exception caused Spark reset the offset with the latest ones. Therefore I could not get any messages. The solution is that I need to set the appropriate fromOffset to avoid the Exception.

这篇关于Kafka spark directStream 无法获取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆