使用Spark结构化流从Kafka读取数据,总是会发生超时问题 [英] Using Spark Structured Streaming to Read Data From Kafka, Issue of Over-time is Always Occured
问题描述
这是我用来通过Spark结构化流从Kafka读取数据的代码,
Here is the code I used to read data from Kafka By using Spark Structured Streaming,
//ss:SparkSession is defined before.
import ss.implicits._
val df = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("subscribe", topic_input)
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", "5000")
.option("failOnDataLoss", "false")
.load()
这是错误代码
Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds
如果将5000放大到10000,仍然会发生此错误.我用谷歌搜索了这个问题.似乎没有关于此问题的太多相关信息.
If I enlarge the 5000 to 10000, this error still happens. And I google this qquestion by Google. It seems there no much related info about this Issue.
这是sbt文件中与此问题相关的部分.
Here is the part of sbt file related to this issue.
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
推荐答案
我也收到了此错误.
我查看了KafkaSourceRDD的源代码,一无所获.
I viewed the source code of KafkaSourceRDD, got nothing.
我猜想kafka连接器出了点问题,因此我在"spark-sql-kafka-0-10_2.11"包中排除了kafka-client,并添加了新的依赖项,如下所示:
I guess something is wrong with the kafka connector, thus I excluded the kafka-client in the "spark-sql-kafka-0-10_2.11" package, and add a new dependency, like this:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
现在可以使用了.希望对您有所帮助.
Now it works. Hope it helps.
我创建了一个jira问题来报告此问题: https://issues.apache.org/jira/browse/SPARK-23829
I created a jira issue to report this problem: https://issues.apache.org/jira/browse/SPARK-23829
2018年12月17日更新:Spark 2.4和Kafka2.0解决了该问题.
Update on 12/17/2018: Spark 2.4 and Kafka2.0 resolves the problem.
这篇关于使用Spark结构化流从Kafka读取数据,总是会发生超时问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!