无法从Spark Streaming连接到Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException [英] Cannot connect from Spark Streaming to Kafka: org.apache.spark.SparkException: java.net.SocketTimeoutException

查看:116
本文介绍了无法从Spark Streaming连接到Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark Streaming直接流从Kafka主题中读取内容,但会收到以下错误消息:

I'm trying to read from a Kafka topic with Spark Streaming direct stream but I receive the following error:

INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)

我有Kafka 0.7.1和Spark 1.5.2.

I have Kafka 0.7.1 and Spark 1.5.2.

我正在使用以下代码:

  val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))   
  val topicsSet = Set("myTopic")
  val kafkaParams = Map[String, String]
          ("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")

  val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)

我确定该主题已经存在,因为其他应用程序正在正确读取该主题.

I am sure that the topic already exists because other applications are correctly reading from it.

推荐答案

在您的情况下,请不要使用旧版本的kafka(

Try not to use older version of kafka, in your case it is (0.7.1). If you have a strong reason to use 0.7.1, do let me know. Looking at your exception, it looks like the application is not able to connect to kafka brokers.

我已使用此直接流api从kafka 0.8.2中读取. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

I have used this direct stream api to read from kafka 0.8.2. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

希望,这将解决您的问题.

Hope, this will solve your problem.

感谢&问候,维卡斯·吉特(Vikas Gite)

Thanks & Regards, Vikas Gite

这篇关于无法从Spark Streaming连接到Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆