火花芯 &任务并发 [英] Spark cores & tasks concurrency

查看:24
本文介绍了火花芯 &任务并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于 spark 的非常基本的问题.我通常使用 50 个内核运行 Spark 作业.在查看作业进度时,大多数情况下它会显示 50 个并行运行的进程(正如它应该做的那样),但有时它只显示 2 或 4 个并行运行的 spark 进程.像这样:

I've a very basic question about spark. I usually run spark jobs using 50 cores. While viewing the job progress, most of the times it shows 50 processes running in parallel (as it is supposed to do), but sometimes it shows only 2 or 4 spark processes running in parallel. Like this:

[Stage 8:================================>                      (297 + 2) / 500]

正在处理的 RDD 在 100 多个分区上重新分区.所以这应该不是问题.

The RDD's being processed are repartitioned on more than 100 partitions. So that shouldn't be an issue.

不过我有一个观察.我见过这样的模式,大多数情况下,SparkUI 中的数据局部性显示 NODE_LOCAL,而其他时候当所有 50 个进程都在运行时,一些进程显示 RACK_LOCAL.这让我怀疑,这可能是因为数据在同一节点中处理之前缓存以避免网络开销,这会减慢进一步处理的速度.

I have an observations though. I've seen the pattern that most of the time it happens, the data locality in SparkUI shows NODE_LOCAL, while other times when all 50 processes are running, some of the processes show RACK_LOCAL. This makes me doubt that, maybe this happens because the data is cached before processing in the same node to avoid network overhead, and this slows down the further processing.

如果是这种情况,有什么方法可以避免.如果不是这种情况,这里发生了什么?

If this is the case, what's the way to avoid it. And if this isn't the case, what's going on here?

推荐答案

经过一周或更长时间的努力,我想我已经找到了导致问题的原因.

After a week or more of struggling with the issue, I think I've found what was causing the problem.

如果您遇到同样的问题,最好开始检查 Spark 实例是否配置良好.有一个很棒的 cloudera 博客文章关于它.

If you are struggling with the same issue, the good point to start would be to check if the Spark instance is configured fine. There is a great cloudera blog post about it.

但是,如果问题不在于配置(就像我的情况),那么问题就出在您的代码中.问题是有时由于不同的原因(倾斜连接、数据源中的分区不均匀等),您正在处理的 RDD 在 2-3 个分区上获取大量数据,而其余分区的数据很少.

However, if the problem isn't with configuration (as was the case with me), then the problem is somewhere within your code. The issue is that sometimes due to different reasons (skewed joins, uneven partitions in data sources etc) the RDD you are working on gets a lot of data on 2-3 partitions and the rest of the partitions have very few data.

为了减少网络上的数据混洗,Spark 尝试让每个执行器处理驻留在该节点本地的数据.所以,2-3个执行器长时间工作,其余的执行器在几毫秒内完成数据.这就是为什么我遇到了我在上述问题中描述的问题.

In order to reduce the data shuffle across the network, Spark tries that each executor processes the data residing locally on that node. So, 2-3 executors are working for a long time, and the rest of the executors are just done with the data in few milliseconds. That's why I was experiencing the issue I described in the question above.

调试这个问题的方法是首先检查你的RDD的分区大小.如果一个或几个分区与其他分区相比非常大,那么下一步将是在大分区中查找记录,以便您可以知道,尤其是在倾斜连接的情况下,哪个键正在发生倾斜.我写了一个小函数来调试这个:

The way to debug this problem is to first of all check the partition sizes of your RDD. If one or few partitions are very big in comparison to others, then the next step would be to find the records in the large partitions, so that you could know, especially in the case of skewed joins, that what key is getting skewed. I've wrote a small function to debug this:

from itertools import islice
def check_skewness(df):
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
    max_part = max(l,key=lambda item:item[1])
    min_part = min(l,key=lambda item:item[1])
    if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
        print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
        print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5)     if i == max_part[0] else []).take(5))
    else:
        print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

它给了我最小和最大分区大小,如果这两者之间的差异超过 5 倍,它会打印最大分区的 5 个元素,让您大致了解发生了什么.

It gives me the smallest and largest partition size, and if the difference between these two is more than 5 times, it prints 5 elements of the largest partition, to should give you a rough idea on what's going on.

一旦你发现问题是偏斜分区,你可以找到一种方法来摆脱那个偏斜的键,或者你可以重新分区你的数据帧,这将迫使它得到平均分布,你会现在看到所有执行程序都将在相同的时间内工作,您会看到可怕的 OOM 错误少得多,处理速度也会显着加快.

Once you have figured out that the problem is skewed partition, you can find a way to get rid of that skewed key, or you can re-partition your dataframe, which will force it to get equally distributed, and you'll see now all the executors will be working for equal time and you'll see far less dreaded OOM errors and processing will be significantly fast too.

这只是我作为 Spark 新手的两分钱,我希望 Spark 专家可以为这个问题补充一些,因为我认为 Spark 世界中的很多新手经常面临类似的问题.

These are just my two cents as a Spark novice, I hope Spark experts can add some more to this issue, as I think a lot of newbies in Spark world face similar kind of problems far too often.

这篇关于火花芯 &任务并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆