当我们在 Apache Spark 中使用时,找不到 Set([TOPICNNAME,0])) 的领导者 [英] Couldn't find leaders for Set([TOPICNNAME,0])) When we are uisng in Apache Spark

查看:27
本文介绍了当我们在 Apache Spark 中使用时,找不到 Set([TOPICNNAME,0])) 的领导者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用 Apache Spark 1.5.1 和 kafka_2.10-0.8.2.1 以及 Kafka DirectStream API 使用 Spark 从 Kafka 获取数据.

We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka DirectStream API to fetch data from Kafka using Spark.

我们使用以下设置在 Kafka 中创建了主题

We created the topics in Kafka with the following settings

ReplicationFactor :1 和 Replica : 1

ReplicationFactor :1 and Replica : 1

当所有 Kafka 实例都在运行时,Spark 作业工作正常.然而,当集群中的一个 Kafka 实例出现故障时,我们会得到下面重现的异常.一段时间后,我们重新启动了禁用的Kafka实例并尝试完成Spark作业,但由于异常,Spark已经终止.因此,我们无法读取 Kafka 主题中的剩余消息.

When all of the Kafka instances are running, the Spark job works fine. When one of the Kafka instances in the cluster is down, however, we get the exception reproduced below. After some time, we restarted the disabled Kafka instance and tried to finish the Spark job, but Spark was had already terminated because of the exception. Because of this, we could not read the remaining messages in the Kafka topics.

ERROR DirectKafkaInputDStream:125 - ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0]))
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

提前致谢.请帮助解决此问题.

Thanks in advance. Please help to resolve this issue.

推荐答案

这是预期的行为.您已通过将 ReplicationFactor 设置为 1 来请求将每个主题存储在一台机器上.当恰好存储主题 normalized-tenant4 的一台机器被关闭时,消费者找不到主题的领导者.

This is expected behaviour. You have requested that each topic be stored on one machine by setting ReplicationFactor to one. When the one machine that happens to store the topic normalized-tenant4 is taken down, the consumer cannot find the leader of the topic.

参见http://kafka.apache.org/documentation.html#intro_guarantees.

这篇关于当我们在 Apache Spark 中使用时,找不到 Set([TOPICNNAME,0])) 的领导者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆