火花芯和任务并发 [英] Spark cores & tasks concurrency

查看:109
本文介绍了火花芯和任务并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于火花,我有一个非常基本的问题.我通常使用50个内核运行spark作业.在查看作业进度时,大多数情况下它会显示50个并行运行的进程(这是应该做的),但有时它仅显示2个或4个并行运行的spark进程.像这样:

I've a very basic question about spark. I usually run spark jobs using 50 cores. While viewing the job progress, most of the times it shows 50 processes running in parallel (as it is supposed to do), but sometimes it shows only 2 or 4 spark processes running in parallel. Like this:

[Stage 8:================================>                      (297 + 2) / 500]

正在处理的RDD在100多个分区上为repartitioned.因此,这不应该成为问题.

The RDD's being processed are repartitioned on more than 100 partitions. So that shouldn't be an issue.

我有一个观察.我已经看到一种模式,在大多数情况下,SparkUI中的数据局部性显示为NODE_LOCAL,而在其他所有50个进程都在运行时,某些进程则显示为RACK_LOCAL. 这使我感到怀疑,也许是因为数据在处理之前在同一节点中被缓存以避免网络开销而发生,并且这减慢了进一步处理的速度.

I have an observations though. I've seen the pattern that most of the time it happens, the data locality in SparkUI shows NODE_LOCAL, while other times when all 50 processes are running, some of the processes show RACK_LOCAL. This makes me doubt that, maybe this happens because the data is cached before processing in the same node to avoid network overhead, and this slows down the further processing.

如果是这种情况,如何避免这种情况.如果不是这种情况,这是怎么回事?

If this is the case, what's the way to avoid it. And if this isn't the case, what's going on here?

推荐答案

经过一个星期或更长时间的努力之后,我认为我已经找到了造成此问题的原因.

After a week or more of struggling with the issue, I think I've found what was causing the problem.

如果您遇到相同的问题,最好的方法是检查Spark实例的配置是否正确.有一个很棒的有关它的cloudera博客帖子.

If you are struggling with the same issue, the good point to start would be to check if the Spark instance is configured fine. There is a great cloudera blog post about it.

但是,如果问题不在于配置(就像我这样),那么问题就出在代码中.问题是,有时由于不同的原因(连接歪斜,数据源中的分区不均匀等),您正在使用的RDD在2-3个分区上获取大量数据,而其余分区中的数据很少.

However, if the problem isn't with configuration (as was the case with me), then the problem is somewhere within your code. The issue is that sometimes due to different reasons (skewed joins, uneven partitions in data sources etc) the RDD you are working on gets a lot of data on 2-3 partitions and the rest of the partitions have very few data.

为了减少整个网络上的数据混乱,Spark尝试让每个执行者处理该节点上本地驻留的数据.因此,2-3个执行器工作了很长时间,而其余的执行器仅在几毫秒内就完成了数据.这就是为什么我遇到上面问题中描述的问题.

In order to reduce the data shuffle across the network, Spark tries that each executor processes the data residing locally on that node. So, 2-3 executors are working for a long time, and the rest of the executors are just done with the data in few milliseconds. That's why I was experiencing the issue I described in the question above.

调试此问题的方法是首先检查RDD的分区大小.如果一个或几个分区与其他分区相比非常大,那么下一步将是在大分区中查找记录,以便您可以知道,尤其是在偏斜联接的情况下,哪个键正偏斜.我写了一个小函数来调试它:

The way to debug this problem is to first of all check the partition sizes of your RDD. If one or few partitions are very big in comparison to others, then the next step would be to find the records in the large partitions, so that you could know, especially in the case of skewed joins, that what key is getting skewed. I've wrote a small function to debug this:

from itertools import islice
def check_skewness(df):
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
    max_part = max(l,key=lambda item:item[1])
    min_part = min(l,key=lambda item:item[1])
    if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
        print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
        print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5)     if i == max_part[0] else []).take(5))
    else:
        print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

它为我提供了最小和最大的分区大小,并且如果两者之差超过5倍,它将打印出最大分区的5个元素,以便让您大致了解正在发生的事情.

It gives me the smallest and largest partition size, and if the difference between these two is more than 5 times, it prints 5 elements of the largest partition, to should give you a rough idea on what's going on.

一旦您发现问题是偏斜的分区,您可以找到一种方法来消除偏斜的密钥,或者您可以对数据帧进行重新分区,这将迫使它平均分配,您将现在看到所有执行器将在相同的时间工作,并且您将看到更少的可怕的OOM错误,并且处理速度也将大大加快.

Once you have figured out that the problem is skewed partition, you can find a way to get rid of that skewed key, or you can re-partition your dataframe, which will force it to get equally distributed, and you'll see now all the executors will be working for equal time and you'll see far less dreaded OOM errors and processing will be significantly fast too.

这些只是我作为Spark新手的两分钱,我希望Spark专家可以为这个问题添加更多内容,因为我认为Spark世界中的许多新手都经常遇到类似的问题.

These are just my two cents as a Spark novice, I hope Spark experts can add some more to this issue, as I think a lot of newbies in Spark world face similar kind of problems far too often.

这篇关于火花芯和任务并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆