Spark 1.0.2(也是1.1.0)挂在分区上 [英] Spark 1.0.2 (also 1.1.0) hangs on a partition

查看:81
本文介绍了Spark 1.0.2(也是1.1.0)挂在分区上的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在apache spark中遇到了一个奇怪的问题,我将不胜感激. 从hdfs读取数据(并进行从json到对象的某种转换)后,处理完2个分区(总共512个)后,下一阶段(处理所述对象)失败. 这在大型数据集上发生(我注意到的最小数据集约为700兆,但可能更低,我还没有缩小范围).

I've got a weird problem in apache spark and I would appreciate some help. After reading data from hdfs (and doing some conversion from json to object) the next stage (processing said objects) fails after 2 partitions have been processed (out of 512 in total). This happens on large-ish datasets (the smallest I have noticed is about 700 megs, but could be lower, I haven't narrowed it down yet).

tgz文件大小为700兆,未压缩的大小为6兆.
在spark 1.1.0上发生相同的事情

700 megs is the tgz file size, uncompressed it's 6 gigs.
EDIT 2: The same thing happens on spark 1.1.0

我正在32核,60 gig的计算机上使用本地主机运行spark,并具有以下设置:

I'm running spark with local master, on a 32 core, 60 gig machine, with the following settings:

spark.akka.timeout = 200
spark.shuffle.consolidateFiles = true
spark.kryoserializer.buffer.mb = 128
spark.reducer.maxMbInFlight = 128

具有16 gig执行程序堆大小.内存没有用完,CPU负载可以忽略不计. Spark会永远挂着.

with 16 gig executor heap size. Memory is not being maxed out, CPU load is negligible. Spark just hangs, forever.

下面是火花日志:

14/09/11 10:19:52 INFO HadoopRDD: Input split: hdfs://localhost:9000/spew/data/json.lines:6351070299+12428842
14/09/11 10:19:53 INFO Executor: Serialized size of result for 511 is 1263
14/09/11 10:19:53 INFO Executor: Sending result for 511 directly to driver
14/09/11 10:19:53 INFO Executor: Finished task ID 511
14/09/11 10:19:53 INFO TaskSetManager: Finished TID 511 in 868 ms on localhost (progress: 512/512)
14/09/11 10:19:53 INFO DAGScheduler: Completed ShuffleMapTask(3, 511)
14/09/11 10:19:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
14/09/11 10:19:53 INFO DAGScheduler: Stage 3 (mapToPair at Main.java:205) finished in 535.874 s
14/09/11 10:19:53 INFO DAGScheduler: looking for newly runnable stages
14/09/11 10:19:53 INFO DAGScheduler: running: Set()
14/09/11 10:19:53 INFO DAGScheduler: waiting: Set(Stage 0, Stage 1, Stage 2)
14/09/11 10:19:53 INFO DAGScheduler: failed: Set()
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 0: List(Stage 1)
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 1: List(Stage 2)
14/09/11 10:19:53 INFO DAGScheduler: Missing parents for Stage 2: List()
14/09/11 10:19:53 INFO DAGScheduler: Submitting Stage 2 (FlatMappedRDD[10] at flatMapToPair at Driver.java:145), which is now runnable
14/09/11 10:19:53 INFO DAGScheduler: Submitting 512 missing tasks from Stage 2 (FlatMappedRDD[10] at flatMapToPair at Driver.java:145)
14/09/11 10:19:53 INFO TaskSchedulerImpl: Adding task set 2.0 with 512 tasks
14/09/11 10:19:53 INFO TaskSetManager: Starting task 2.0:0 as TID 512 on executor localhost: localhost (PROCESS_LOCAL)
14/09/11 10:19:53 INFO TaskSetManager: Serialized task 2.0:0 as 3469 bytes in 0 ms
14/09/11 10:19:53 INFO Executor: Running task ID 512
14/09/11 10:19:53 INFO BlockManager: Found block broadcast_0 locally
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks
14/09/11 10:19:53 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms
14/09/11 10:20:07 INFO Executor: Serialized size of result for 512 is 1479
14/09/11 10:20:07 INFO Executor: Sending result for 512 directly to driver
14/09/11 10:20:07 INFO Executor: Finished task ID 512
14/09/11 10:20:07 INFO TaskSetManager: Starting task 2.0:1 as TID 513 on executor localhost: localhost (PROCESS_LOCAL)
14/09/11 10:20:07 INFO TaskSetManager: Serialized task 2.0:1 as 3469 bytes in 0 ms
14/09/11 10:20:07 INFO Executor: Running task ID 513
14/09/11 10:20:07 INFO TaskSetManager: Finished TID 512 in 13996 ms on localhost (progress: 1/512)
14/09/11 10:20:07 INFO DAGScheduler: Completed ShuffleMapTask(2, 0)
14/09/11 10:20:07 INFO BlockManager: Found block broadcast_0 locally
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks
14/09/11 10:20:07 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms
14/09/11 10:20:15 INFO Executor: Serialized size of result for 513 is 1479
14/09/11 10:20:15 INFO Executor: Sending result for 513 directly to driver
14/09/11 10:20:15 INFO Executor: Finished task ID 513
14/09/11 10:20:15 INFO TaskSetManager: Starting task 2.0:2 as TID 514 on executor localhost: localhost (PROCESS_LOCAL)
14/09/11 10:20:15 INFO TaskSetManager: Serialized task 2.0:2 as 3469 bytes in 0 ms
14/09/11 10:20:15 INFO Executor: Running task ID 514
14/09/11 10:20:15 INFO TaskSetManager: Finished TID 513 in 7768 ms on localhost (progress: 2/512)
14/09/11 10:20:15 INFO DAGScheduler: Completed ShuffleMapTask(2, 1)
14/09/11 10:20:15 INFO BlockManager: Found block broadcast_0 locally
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 134217728, targetRequestSize: 26843545
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 512 non-empty blocks out of 512 blocks
14/09/11 10:20:15 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms

1)DAGScheduler: failed: Set()是什么意思?我认为这不是关键,因为它是INFO级别,但您永远不会知道.

1) What does DAGScheduler: failed: Set() mean? I assume it's not critical since it's INFO level, but you never know.

2)Missing parents是什么意思?再次,它是信息.

2) What does Missing parents mean? Again, it's INFO.

这是jstack的输出:

This is the output of jstack:

"Service Thread" #20 daemon prio=9 os_prio=0 tid=0x00007f39400ff000 nid=0x10560 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x00007f39400fa000 nid=0x1055f waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread13" #18 daemon prio=9 os_prio=0 tid=0x00007f39400f8000 nid=0x1055e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread12" #17 daemon prio=9 os_prio=0 tid=0x00007f39400f6000 nid=0x1055d waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread11" #16 daemon prio=9 os_prio=0 tid=0x00007f39400f4000 nid=0x1055c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread10" #15 daemon prio=9 os_prio=0 tid=0x00007f39400f1800 nid=0x1055b waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread9" #14 daemon prio=9 os_prio=0 tid=0x00007f39400ef800 nid=0x1055a waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread8" #13 daemon prio=9 os_prio=0 tid=0x00007f39400ed800 nid=0x10559 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread7" #12 daemon prio=9 os_prio=0 tid=0x00007f39400eb800 nid=0x10558 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread6" #11 daemon prio=9 os_prio=0 tid=0x00007f39400e9800 nid=0x10557 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread5" #10 daemon prio=9 os_prio=0 tid=0x00007f39400e7800 nid=0x10556 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread4" #9 daemon prio=9 os_prio=0 tid=0x00007f39400dd000 nid=0x10555 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007f39400db000 nid=0x10554 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f39400d8800 nid=0x10553 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f39400d7000 nid=0x10552 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f39400d4000 nid=0x10551 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f39400d2000 nid=0x10550 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f39400a2800 nid=0x1054f in Object.wait() [0x00007f38d39f8000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142)
    - locked <0x00000000e0038180> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:158)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f39400a0800 nid=0x1054e in Object.wait() [0x00007f38d3af9000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
    - locked <0x00000000e00161b8> (a java.lang.ref.Reference$Lock)

"main" #1 prio=5 os_prio=0 tid=0x00007f394000a000 nid=0x10535 in Object.wait() [0x00007f3945ee1000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000e03df000> (a org.apache.spark.scheduler.JobWaiter)
    at java.lang.Object.wait(Object.java:502)
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
    - locked <0x00000000e03df000> (a org.apache.spark.scheduler.JobWaiter)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:452)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1051)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1097)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:716)
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:294)
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:44)
    at spew.Driver.run(Driver.java:88)
    at spew.Main.main(Main.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

"VM Thread" os_prio=0 tid=0x00007f3940099800 nid=0x1054d runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f394001f800 nid=0x10536 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f3940021000 nid=0x10537 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f3940023000 nid=0x10538 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f3940024800 nid=0x10539 runnable 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f3940026800 nid=0x1053a runnable 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f3940028000 nid=0x1053b runnable 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f394002a000 nid=0x1053c runnable 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f394002b800 nid=0x1053d runnable 

"GC task thread#8 (ParallelGC)" os_prio=0 tid=0x00007f394002d000 nid=0x1053e runnable 

"GC task thread#9 (ParallelGC)" os_prio=0 tid=0x00007f394002f000 nid=0x1053f runnable 

"GC task thread#10 (ParallelGC)" os_prio=0 tid=0x00007f3940030800 nid=0x10540 runnable 

"GC task thread#11 (ParallelGC)" os_prio=0 tid=0x00007f3940032800 nid=0x10541 runnable 

"GC task thread#12 (ParallelGC)" os_prio=0 tid=0x00007f3940034000 nid=0x10542 runnable 

"GC task thread#13 (ParallelGC)" os_prio=0 tid=0x00007f3940036000 nid=0x10543 runnable 

"GC task thread#14 (ParallelGC)" os_prio=0 tid=0x00007f3940037800 nid=0x10544 runnable 

"GC task thread#15 (ParallelGC)" os_prio=0 tid=0x00007f3940039800 nid=0x10545 runnable 

"GC task thread#16 (ParallelGC)" os_prio=0 tid=0x00007f394003b000 nid=0x10546 runnable 

"GC task thread#17 (ParallelGC)" os_prio=0 tid=0x00007f394003d000 nid=0x10547 runnable 

"GC task thread#18 (ParallelGC)" os_prio=0 tid=0x00007f394003e800 nid=0x10548 runnable 

"GC task thread#19 (ParallelGC)" os_prio=0 tid=0x00007f3940040800 nid=0x10549 runnable 

"GC task thread#20 (ParallelGC)" os_prio=0 tid=0x00007f3940042000 nid=0x1054a runnable 

"GC task thread#21 (ParallelGC)" os_prio=0 tid=0x00007f3940044000 nid=0x1054b runnable 

"GC task thread#22 (ParallelGC)" os_prio=0 tid=0x00007f3940045800 nid=0x1054c runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f3940102000 nid=0x10561 waiting on condition 

JNI global references: 422

有人有像Spark这样的问题吗?奇怪的是,对于小型数据集(测试夹具等),它可以工作.

Has anyone had issues like this with spark? It's odd because for small (tiny) datasets (test fixtures, etc) it works.

推荐答案

对于这样的版本的问题,但我知道当应用程序挂起时,可能是由于您的资源被杀死(通过.

I do not to answer a question for such an old spark version, but I know that when the application hangs, it is probably due to your resources being killed (by yarn for example).

我在> Spark的KMeans无法处理大数据吗?/a>最好的办法是对应用程序进行微调,因为您的问题中没有任何信息可以建议如何解决此问题.

I had a similar problem in Is Spark's KMeans unable to handle bigdata? The best thing you could do is to fine tune your application, since there is not any information in your question that would suggest how to fix this.

您还可以根据经验法则来微调分区数.

You could also fine tune the number of Partitions, with the rule of thumb.

对于另一项工作,我不得不微调以便扩展到15T数据,我在 Spark中的内存开销问题,但我不知道这是否相关.

For another job I had to fine tune in order to scale to 15T data, I reported my approach in memoryOverhead issue in Spark, but I don't know if this is related.

如卡尔·希格利(Karl Higley)所建议,我同意:

As Karl Higley suggested and I agree:

有向无环图(DAG)调度失败意味着失败阶段的集合为空(即,尚未发生任何故障.)

The Directed Acyclic Graph (DAG) scheduling failure means that the set of failed stages is empty (i.e. nothing has failed yet.).

缺少父母是阶段的列表,其阶段的结果是计算请求的结果所必需的,并且尚未缓存在内存中.

Missing parents is the list of stages whose results are required to compute the requested results and which aren't already cached in memory.

这篇关于Spark 1.0.2(也是1.1.0)挂在分区上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆