Spark Streaming + Kafka:SparkException:找不到Set的引线偏移量 [英] Spark Streaming + Kafka: SparkException: Couldn't find leader offsets for Set

查看:388
本文介绍了Spark Streaming + Kafka:SparkException:找不到Set的引线偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试设置Spark Streaming以从Kafka队列获取消息.我收到以下错误:

I'm trying to setup Spark Streaming to get messages from Kafka queue. I'm getting the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)

这是我正在执行的代码(pyspark):

Here is the code I'm executing (pyspark):

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})

ssc.start()
ssc.awaitTermination()

有两个类似的帖子,但有相同的错误.在所有情况下,原因都是空的kafka主题.我的测试主题"中有消息.我可以用

There were a couple of similar posts with the same error. In all cases the cause was the empty kafka topic. There are messages in my "test-topic". I can get them out with

kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100

有人知道这可能是什么问题吗?

Does anyone know what might be the problem?

我正在使用:

  • 火花1.5.2(apache)
  • Kafka 0.8.2.0 + kafka1.3.0(CDH 5.4.7)

推荐答案

您需要检查2件事:

  1. 检查此主题和分区是否存在,在您的情况下,主题为test-topic且分区为0.

根据您的代码,您正在尝试从偏移量0开始消耗消息,并且可能从偏移量0开始不可用消息,请检查最早的偏移量,然后从那里开始消耗.

based on your code, you are trying consume message from offset 0 and it might be possible message is not available from offset 0, check what is you earliest offset and try consume from there.

下面是检查最早偏移量的命令:

Below is command to check earliest offset:

sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1 

这篇关于Spark Streaming + Kafka:SparkException:找不到Set的引线偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆