无法从 Spark Streaming 连接到 Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException [英] Cannot connect from Spark Streaming to Kafka: org.apache.spark.SparkException: java.net.SocketTimeoutException

查看:30
本文介绍了无法从 Spark Streaming 连接到 Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark Streaming 直接流读取 Kafka 主题,但收到以下错误:

I'm trying to read from a Kafka topic with Spark Streaming direct stream but I receive the following error:

INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)

我有 Kafka 0.7.1 和 Spark 1.5.2.

I have Kafka 0.7.1 and Spark 1.5.2.

我正在使用以下代码:

  val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))   
  val topicsSet = Set("myTopic")
  val kafkaParams = Map[String, String]
          ("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")

  val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)

我确信该主题已经存在,因为其他应用程序正在正确读取它.

I am sure that the topic already exists because other applications are correctly reading from it.

推荐答案

尽量不要使用旧版本的 kafka,在你的情况下是 (0.7.1).如果您有充分的理由使用 0.7.1,请告诉我.查看您的异常,该应用程序似乎无法连接到 kafka 代理.

Try not to use older version of kafka, in your case it is (0.7.1). If you have a strong reason to use 0.7.1, do let me know. Looking at your exception, it looks like the application is not able to connect to kafka brokers.

我已经使用这个直接流 api 从 kafka 0.8.2 读取.https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

I have used this direct stream api to read from kafka 0.8.2. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

希望这能解决您的问题.

Hope, this will solve your problem.

谢谢&问候,维卡斯·吉特

Thanks & Regards, Vikas Gite

这篇关于无法从 Spark Streaming 连接到 Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆