spark 如何将分区分配给执行程序 [英] How spark distributes partitions to executors

查看:48
本文介绍了spark 如何将分区分配给执行程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了性能问题,在分析了 Spark Web UI 之后,我发现了数据偏斜问题:

最初我认为分区分布不均匀,所以我对每个分区的行数进行了分析,但看起来很正常(没有异常值):

所以现在的假设是分区在执行程序之间分布不均匀,问题是:spark如何将分区分配给执行程序?以及我如何将其更改为解决我的偏度问题?

代码很简单:

hive_query = """SELECT ... FROM <multiple joined hive tables>"""df = sqlContext.sql(hive_query).cache()打印(df.count())

更新 发布这个问题后,我进行了进一步的分析,发现有 3 个表导致了这种情况,如果删除它们,数据将均匀分布在执行程序中并且性能提高,所以我添加了 sparksql 提示/*+ BROADCASTJOIN() */并且它起作用了,现在性能好多了,但问题仍然存在:

为什么这个表(包括一个 6 行的小表)在添加到查询时会导致执行器之间的这种分布不均匀?

解决方案

当您从 HDFS 读取数据时,分区数取决于您正在读取的块数.从附加的图像来看,您的数据似乎并未在集群中均匀分布.尝试重新分区您的数据并增加调整内核和执行程序的数量.

如果您要重新分区数据,哈希分区器将返回一个比其他值更常见的值,这会导致数据倾斜.

如果这是在执行 join 之后,那么您的数据是偏斜的.

I have a performance issue and after analyzing Spark web UI i found what it seems to be data skewness:

Initially i thought partitions were not evenly distributed, so i performed an analysis of rowcount per partitions, but it seems normal(with no outliers): how to manually run pyspark's partitioning function for debugging

But the problem persists and i see there is one executor processing most of the data:

So the hypothesis now is partitions are not evenly distributed across executors, question is: how spark distributes partitions to executors? and how can i change it to solve my skewness problem?

The code is very simple:

hive_query = """SELECT ... FROM <multiple joined hive tables>"""
df = sqlContext.sql(hive_query).cache()
print(df.count())

Update after posting this question i performed further analysis and found that there 3 tables that cause this, if they are removed the data is evenly distributed in the executors and performance improves, so i added the spark sql hint /*+ BROADCASTJOIN(<table_name>) */ and it worked, performance is much better now, but the question remains:

why do this tables(including a small 6 rows table) cause this uneven distribution across executors when added to the query ?

解决方案

When you are reading data from HDFS the number of partitions depends on the number of blocks you are reading. From the images attached it looks like your data is not distributed evenly across the cluster. Try repartitioning your data and increase tweak the number of cores and executors.

If you are repartitioning your data, the hash partitioner is returning a value which is more common than other can lead to data skew.

If this is after performing join, then your data is skewed.

这篇关于spark 如何将分区分配给执行程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆