使用Spark Structured Streaming从Kafka读取数据,总是出现超时问题 [英] Using Spark Structured Streaming to Read Data From Kafka, Issue of Over-time is Always Occured

查看:50
本文介绍了使用Spark Structured Streaming从Kafka读取数据,总是出现超时问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这里是我使用Spark Structured Streaming从Kafka读取数据的代码,

Here is the code I used to read data from Kafka By using Spark Structured Streaming,

//ss:SparkSession is defined before. 
import ss.implicits._
val df = ss
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_server)
  .option("subscribe", topic_input)
  .option("startingOffsets", "latest")
  .option("kafkaConsumer.pollTimeoutMs", "5000")
  .option("failOnDataLoss", "false")
  .load()

这是错误代码,

  Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds

如果我将 5000 放大到 10000,仍然会出现此错误.我用谷歌搜索了这个问题.似乎没有太多关于此问题的相关信息.

If I enlarge the 5000 to 10000, this error still happens. And I google this qquestion by Google. It seems there no much related info about this Issue.

这是与此问题相关的 sbt 文件部分.

Here is the part of sbt file related to this issue.

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

推荐答案

我也遇到了这个错误.

我查看了KafkaSourceRDD的源代码,一无所获.

I viewed the source code of KafkaSourceRDD, got nothing.

我猜kafka连接器有问题,因此我在spark-sql-kafka-0-10_2.11"包中排除了kafka-client,并添加了一个新的依赖项,如下所示:

I guess something is wrong with the kafka connector, thus I excluded the kafka-client in the "spark-sql-kafka-0-10_2.11" package, and add a new dependency, like this:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.3.0</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>kafka-clients</artifactId>
                <groupId>org.apache.kafka</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

现在可以了.希望有帮助.

Now it works. Hope it helps.

我创建了一个 jira 问题来报告这个问题:https://issues.apache.org/jira/browse/SPARK-23829

I created a jira issue to report this problem: https://issues.apache.org/jira/browse/SPARK-23829

12/17/2018 更新:Spark 2.4 和 Kafka2.0 解决了问题.

Update on 12/17/2018: Spark 2.4 and Kafka2.0 resolves the problem.

这篇关于使用Spark Structured Streaming从Kafka读取数据,总是出现超时问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆