Spark工作在YARN模式下失败 [英] Spark job failing in YARN mode

查看:1162
本文介绍了Spark工作在YARN模式下失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用Scala编写的Spark程序,它从HDFS读取一个CSV文件,计算一个新列并将其保存为一个parquet文件。我正在YARN集群中运行程序。但是每次我尝试启动它时,执行程序都会在某个时候出现此错误。



您能帮我找到可能导致此错误的原因吗? b
$ b

从执行程序登录

  16/10/27 15:58:10 WARN storage .BlockManager:将块rdd_12_225因异常而失败
16/10/27 15:58:10 WARN storage.BlockManager:块rdd_12_225无法删除,因为在磁盘或内存中找不到
16/10/27 15:58:10 ERROR executor.Executor:阶段4.0(TID 465)中的任务225.0中的异常
java.io.IOException:流在org.apache.spark中损坏
。 io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
在org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
在java.io.BufferedInputStream.fill(BufferedInputStream。 java:246)
在java.io.BufferedInputStream.read(BufferedInputStream.java:265)
在java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance $$ anon $ 3 $$ anon $ 1.readSize(UnsafeRowSerializer.scala:113)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance $$ anon $ 3 $$ anon $ 1.< init>(UnsafeRowSerializer.scala:120)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance $$ anon $ 3.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at org。 apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 3.apply(BlockStoreShuffleReader.scala:66)
at org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 3.apply(BlockStoreShuffleReader.scala:62)
at scala.collection.Iterator $$ anon $ 12.nextCur(Iterator.scala:434)
at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:440)
at scala。 collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark。 InterruptibleIterator.hasNext(InterruptibleIterator.scala :39)
at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext(Unknown源)
在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
在org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.Hex(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.columnar.InMemoryRelation $$ anonfun $ 3 $$ anon $ 1.next(InMemoryRelation.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation $$ anonfun $ 3 $$ anon $ 1.next(InMemoryRelation.scala:110)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore .scala:214)
at org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager $$ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:926)
在org.apache.spark。 storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager。 getOrElseUpdate(BlockManager.scala:670)
在org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
在org.apache.spark.rdd.RDD.iterator(RDD。阶:281)
。在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
。在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache .spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spar k.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)
at java.util.concurrent。 ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java: 745)
导致:net.jpountz.lz4.LZ4Exception:在net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)处输入缓冲区
的解码错误15385错误
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
... 41 more

编辑:



有使用的代码

  VAR DF = spark.read.option( 头, 真)。选项( 则InferSchema, 真)。选项( treatEmptyValuesAsNulls, 真)。CSV(hdfsFileURLIn).repartition(nPartition的)
df.printSchema()
df = df.withColumn(ipix,a2p(df.col(deName),df.col(raN ame)))。persist(StorageLevel.MEMORY_AND_DISK)
df.repartition(nPartitions,$ipix)。write.mode(overwrite)。option(spark.hadoop.dfs.replication,1) .parquet(hdfsFileURLOut)

用户函数a2p只是取两个Double并返回另一个double

我需要说的是,对于相对较小的CSV(〜1Go),这种方式运行良好,但每次发生这种错误时都会发生较大的错误(〜15Go)



编辑2:
根据建议,我禁用了重新分区并使用了StorageLevel.DISK_ONLY



使用此功能,我无法获得将block rdd _ *****放置于由于异常而失败,但仍然存在与LZ4(Stream已损坏)相关的异常:

  16/10/28 07:53:00错误util.Utils:中止任务
java.io.IOException:流已损坏
在org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream .java:211)
在org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockI
at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org .spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
在org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
在org.apache.spark .sql.execution.UnsafeRowSerializerInstance $$ anon $ 3 $$ anon $ 1.next(UnsafeRowSerializer.scala:127)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance $$ anon $ 3 $$ anon $ 1.next( ():
at scala.collection.Iterator $$ anon $ 12.next(Iterator.scala:444)
at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala: 409)
在org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
在org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)
在org.apache.spark.sql.execut ion.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp(WriterContainer.scala:254)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply (WriterContainer.scala:252)
在org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.适用(WriterContainer.scala:252)
在org.apache.spark .util.Utils $ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
at org.apache。 spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources .InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTa
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor。 $ java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1142) $ b at java.lang.Thread.run(Thread.java:745)
原因:net.jpountz.lz4.LZ4Exception:在net.jpountz.lz4解码输入缓冲区
的偏移量12966时出错。 LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
... 25 more

编辑3:我通过删除第二个重新分区(使用列ipix进行重新分区),设法无误地启动它。进一步看这个方法的文档编辑4:这很奇怪,偶尔有些执行者会失败分段错误:

 
#Java运行时环境检测到一个致命错误:

#SIGSEGV(0XB)在PC = 0x00007f48d8a47f2c,PID = 3501 TID = 0x00007f48cc60c700

#JRE版本:爪哇(TM)SE运行时环境(8.0_102-B14)(建1.8
#Java VM:Java HotSpot™64位服务器虚拟机(25.102-b14混合模式linux-amd64压缩oops)
#有问题的框架:
#J 4713 C2 org.apache.spark.unsafe.types.UTF8String.hashCode()I(18字节)@ 0x00007f48d8a47f2c [0x00007f48d8a47e60 + 0xcc]

#写入核心转储。默认位置:/ tmp / hadoop-root / nm-local-dir / usercache / root / appcache / application_1477580152295_0008 / container_1477580152295_0008_01_000006 / core或core.3501

#保存更多信息的错误报告文件as:
#/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/hs_err_pid3501.log

#如果您想提交一个错误报告,请访问:
#http://bugreport.java.com/bugreport/crash.jsp

我检查了内存,我所有的执行者总是有足够的空闲内存(至少6Go)。编辑4:所以我测试了有多个文件,并且执行总是成功的,但有时执行程序会失败(具有上述错误),并由YARN重新启动 解决方案

跑到同一个问题。

症状看起来完全像这样问题:SPARK-18105

截至2017年1月29日,尚未确定。

I have a Spark program written in Scala that read a CSV file from HDFS, compute a new column and save it as a parquet file. I am running the program in a YARN cluster. But every time I try to launch it the executors fails at some point with this error.

Could you help me to find what might cause this error ?

Log from on executor

16/10/27 15:58:10 WARN storage.BlockManager: Putting block rdd_12_225 failed due to an exception
16/10/27 15:58:10 WARN storage.BlockManager: Block rdd_12_225 could not be removed as it was not found on disk or in memory
16/10/27 15:58:10 ERROR executor.Executor: Exception in task 225.0 in stage 4.0 (TID 465)
java.io.IOException: Stream is corrupted
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
    at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.readSize(UnsafeRowSerializer.scala:113)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.<init>(UnsafeRowSerializer.scala:120)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3.asKeyValueIterator(UnsafeRowSerializer.scala:110)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:66)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:62)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:118)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:110)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 15385 of input buffer
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
    ... 41 more

EDIT :

There is the code used

var df = spark.read.option("header", "true").option("inferSchema", "true").option("treatEmptyValuesAsNulls", "true").csv(hdfsFileURLIn).repartition(nPartitions)
df.printSchema()
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName))).persist(StorageLevel.MEMORY_AND_DISK)
df.repartition(nPartitions, $"ipix").write.mode("overwrite").option("spark.hadoop.dfs.replication", 1).parquet(hdfsFileURLOut)

the user function a2p is just taking two Double and return an other double

I need to say that this worked well with relatively small CSV (~1Go) but this error happen every times with bigger ones (~15Go)

EDIT 2: Following the suggestions I disabled the repartition and I used StorageLevel.DISK_ONLY

With this I don't get the Putting block rdd_***** failed due to an exception but there is still an exception related to LZ4 (Stream is corrupted):

16/10/28 07:53:00 ERROR util.Utils: Aborting task
java.io.IOException: Stream is corrupted
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
    at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
    at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:254)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 12966 of input buffer
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
    ... 25 more

EDIT 3 : I managed to launch it without any errors by removing also the second repartition (the one that repartition using the column ipix) I will look further in the documentation of this method

EDIT 4 : This is strange, occasionally some executors fail with a segmentation fault :

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f48d8a47f2c, pid=3501, tid=0x00007f48cc60c700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 1.8.0_102-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# J 4713 C2 org.apache.spark.unsafe.types.UTF8String.hashCode()I (18 bytes) @ 0x00007f48d8a47f2c [0x00007f48d8a47e60+0xcc]
#
# Core dump written. Default location: /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/core or core.3501
#
# An error report file with more information is saved as:
# /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/hs_err_pid3501.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

I checked the memory and all my executors always have plenty of free memory (at least 6Go)

EDIT 4 : So I tested with multiple files and the execution always succeed but sometime some executors fails (with the error above) and are started again by YARN

解决方案

Ran into the same issue.

Symptoms look exactly like this problem: SPARK-18105.

As of 1/29/17 it is not fixed yet.

这篇关于Spark工作在YARN模式下失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆