Spark Streaming 应用程序因 KafkaException:字符串超过最大大小或 IllegalArgumentException 而失败 [英] Spark Streaming application fails with KafkaException: String exceeds the maximum size or with IllegalArgumentException

查看:31
本文介绍了Spark Streaming 应用程序因 KafkaException:字符串超过最大大小或 IllegalArgumentException 而失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TL;博士:

我非常简单的 Spark Streaming 应用程序在驱动程序中失败,并显示KafkaException:字符串超出最大大小".我在执行程序中看到了相同的异常,但我也在执行程序日志的某个地方发现了一个 IllegalArgumentException,其中没有其他信息

My very simple Spark Streaming application fails in the driver with the "KafkaException: String exceeds the maximum size". I see the same exception in the executor but I also found somewhere down the executor's logs an IllegalArgumentException with no other information in it

完整问题:

我正在使用 Spark Streaming 从 Kafka 主题中读取一些消息.这就是我正在做的:

I'm using Spark Streaming to read some messages from a Kafka topic. This is what I'm doing:

val conf = new SparkConf().setAppName("testName")
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis))
val kafkaParams = Map(
      "metadata.broker.list" -> "somevalidaddresshere:9092",
      "auto.offset.reset" -> "largest"
    )
val topics = Set("data")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      streamingContext,
      kafkaParams,
      topics
    ).map(_._2) // only need the values not the keys

我对 Kafka 数据所做的只是使用:

What I'm doing with the Kafka data is only printing it using:

stream.print()

我的应用程序显然有比这更多的代码,但为了找到我的问题,我从代码中剥离了所有可能的内容

My application obviously has more code than this but in order to locate my problem I stripped everything I possibly could from the code

我正在尝试在 YARN 上运行此代码.这是我的 spark 提交行:

I'm trying to run this code on YARN. This is my spark submit line:

./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties

streamconfig.properties 文件只是一个常规的属性文件,可能与这里的问题无关

The streamconfig.properties file is just a regular properties file which is probably irrelevant to the problem here

尝试执行应用程序后,它很快失败,驱动程序出现以下异常:

After trying to execute the application it fails pretty quickly with the following exception on the driver:

16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767.
    at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73)
    at kafka.api.TopicData$.headerSize(FetchResponse.scala:107)
    at kafka.api.TopicData.<init>(FetchResponse.scala:113)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:103)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我什至没有在堆栈跟踪中看到我的代码

I don't even see my code in the stack trace

检查 executor 我发现了与驱动程序中相同的异常,但也深埋在下面的是以下异常:

Examining the executor I found the same exception as in the driver but also buried deep down is the following exception:

16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8)
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我不知道什么是 IllegalArgument,因为没有包含任何信息

I have no idea what is the IllegalArgument since no information is included

我的 YARN 使用的 Spark 版本是 1.6.0.我还验证了我的 pom 包含 Spark 1.6.0 而不是早期版本.我的范围是提供"的

The Spark version my YARN is using is 1.6.0. I also verified my pom contains Spark 1.6.0 and not an earlier version. My scope is "provided"

我从完全相同的主题中手动读取数据,那里的数据只是普通的 JSON.那里的数据根本不是很大.绝对小于 32767.此外,我可以使用常规命令行使用者读取这些数据,这很奇怪

I manually read the data from the exact same topic and the data there is just plain JSONs. The data there is not huge at all. Definitely smaller than 32767. Also I'm able to read this data using the regular command line consumer so that's weird

很遗憾,谷歌搜索这个异常没有提供任何有用的信息

Googling this exception sadly didn't provide any useful information

有没有人知道如何理解这里的问题究竟是什么?

Does anyone have any idea on how to understand what exactly is the problem here?

提前致谢

推荐答案

经过大量挖掘,我想我找到了问题所在.我在 YARN (1.6.0-cdh5.7.0) 上运行 Spark.Cloudera 有新的 Kafka 客户端(0.9 版本),它与早期版本的协议间发生了变化.但是我们的Kafka是0.8.2版本的.

After a lot of digging I think I found what the problem was. I'm running Spark on YARN (1.6.0-cdh5.7.0). Cloudera has the new Kafka client (0.9 version) which had an inter protocol change from the earlier versions. However, our Kafka is of version 0.8.2.

这篇关于Spark Streaming 应用程序因 KafkaException:字符串超过最大大小或 IllegalArgumentException 而失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆