Spark Streaming应用程序失败,出现KafkaException:字符串超出最大大小或IllegalArgumentException [英] Spark Streaming application fails with KafkaException: String exceeds the maximum size or with IllegalArgumentException

查看:153
本文介绍了Spark Streaming应用程序失败,出现KafkaException:字符串超出最大大小或IllegalArgumentException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TL; DR:

我非常简单的Spark Streaming应用程序在驱动程序中失败,并显示 KafkaException:字符串超出最大大小。我在执行程序中看到了相同的异常,但是我还在执行程序的日志中的某个地方发现了一个IllegalArgumentException,其中没有其他信息

My very simple Spark Streaming application fails in the driver with the "KafkaException: String exceeds the maximum size". I see the same exception in the executor but I also found somewhere down the executor's logs an IllegalArgumentException with no other information in it

完整问题:

我正在使用Spark Streaming从Kafka主题中读取一些消息。
这是我在做什么:

I'm using Spark Streaming to read some messages from a Kafka topic. This is what I'm doing:

val conf = new SparkConf().setAppName("testName")
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis))
val kafkaParams = Map(
      "metadata.broker.list" -> "somevalidaddresshere:9092",
      "auto.offset.reset" -> "largest"
    )
val topics = Set("data")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      streamingContext,
      kafkaParams,
      topics
    ).map(_._2) // only need the values not the keys

我对Kafka数据所做的操作仅使用以下命令进行打印:

What I'm doing with the Kafka data is only printing it using:

stream.print()

我的应用程序显然有更多功能代码,但为了找到问题,我从代码中剥离了所有可能的东西

My application obviously has more code than this but in order to locate my problem I stripped everything I possibly could from the code

我试图在YARN上运行此代码。
这是我的火花提交行:

I'm trying to run this code on YARN. This is my spark submit line:

./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties

streamconfig.properties文件只是一个常规属性文件,可能与这里的问题无关

The streamconfig.properties file is just a regular properties file which is probably irrelevant to the problem here

应用程序,它很快就会失败,但驱动程序上出现以下异常:

After trying to execute the application it fails pretty quickly with the following exception on the driver:

16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767.
    at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73)
    at kafka.api.TopicData$.headerSize(FetchResponse.scala:107)
    at kafka.api.TopicData.<init>(FetchResponse.scala:113)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:103)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我什至都看不到我的代码堆栈跟踪

I don't even see my code in the stack trace

检查执行程序,我发现与driv中相同的异常er,但也埋在下面,是以下异常:

Examining the executor I found the same exception as in the driver but also buried deep down is the following exception:

16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8)
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:275)
    at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我不知道什么是IllegalArgument,因为不包含任何信息

I have no idea what is the IllegalArgument since no information is included

我的YARN使用的Spark版本是1.6.0。我还验证了我的pom包含Spark 1.6.0,而不是早期版本。我的作用域是提供的。

The Spark version my YARN is using is 1.6.0. I also verified my pom contains Spark 1.6.0 and not an earlier version. My scope is "provided"

我从完全相同的主题中手动读取数据,而这些数据只是纯JSON。那里的数据根本不是很大。绝对小于32767。此外,我还能够使用常规命令行使用者读取此数据,因此很奇怪

I manually read the data from the exact same topic and the data there is just plain JSONs. The data there is not huge at all. Definitely smaller than 32767. Also I'm able to read this data using the regular command line consumer so that's weird

谷歌搜索此异常可悲的是没有提供任何有用的信息

Googling this exception sadly didn't provide any useful information

有人对如何理解这里的问题有任何想法吗?

Does anyone have any idea on how to understand what exactly is the problem here?

请先感谢

推荐答案

经过大量挖掘,我认为我发现了问题所在。我正在YARN(1.6.0-cdh5.7.0)上运行Spark。 Cloudera具有新的Kafka客户端(0.9版),该客户端与早期版本之间的协议间有所更改。但是,我们的Kafka版本是0.8.2.。

After a lot of digging I think I found what the problem was. I'm running Spark on YARN (1.6.0-cdh5.7.0). Cloudera has the new Kafka client (0.9 version) which had an inter protocol change from the earlier versions. However, our Kafka is of version 0.8.2.

这篇关于Spark Streaming应用程序失败,出现KafkaException:字符串超出最大大小或IllegalArgumentException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆