无法从Spark Streaming连接到Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException [英] Cannot connect from Spark Streaming to Kafka: org.apache.spark.SparkException: java.net.SocketTimeoutException
问题描述
我正在尝试使用Spark Streaming直接流从Kafka主题中读取内容,但会收到以下错误消息:
I'm trying to read from a Kafka topic with Spark Streaming direct stream but I receive the following error:
INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
我有Kafka 0.7.1和Spark 1.5.2.
I have Kafka 0.7.1 and Spark 1.5.2.
我正在使用以下代码:
val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))
val topicsSet = Set("myTopic")
val kafkaParams = Map[String, String]
("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
我确定该主题已经存在,因为其他应用程序正在正确读取该主题.
I am sure that the topic already exists because other applications are correctly reading from it.
推荐答案
Try not to use older version of kafka, in your case it is (0.7.1). If you have a strong reason to use 0.7.1, do let me know. Looking at your exception, it looks like the application is not able to connect to kafka brokers.
我已使用此直接流api从kafka 0.8.2中读取. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
I have used this direct stream api to read from kafka 0.8.2. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
希望,这将解决您的问题.
Hope, this will solve your problem.
感谢&问候,维卡斯·吉特(Vikas Gite)
Thanks & Regards, Vikas Gite
这篇关于无法从Spark Streaming连接到Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!