spark 如何将分区分配给执行程序 [英] How spark distributes partitions to executors
问题描述
我遇到了性能问题,在分析了 Spark Web UI 之后,我发现了数据偏斜问题:
最初我认为分区分布不均匀,所以我对每个分区的行数进行了分析,但看起来很正常(没有异常值):
所以现在的假设是分区在执行程序之间分布不均匀,问题是:spark如何将分区分配给执行程序?以及我如何将其更改为解决我的偏度问题?
代码很简单:
hive_query = """SELECT ... FROM <multiple joined hive tables>"""df = sqlContext.sql(hive_query).cache()打印(df.count())
更新 发布这个问题后,我进行了进一步的分析,发现有 3 个表导致了这种情况,如果删除它们,数据将均匀分布在执行程序中并且性能提高,所以我添加了 sparksql 提示/*+ BROADCASTJOIN(
为什么这个表(包括一个 6 行的小表)在添加到查询时会导致执行器之间的这种分布不均匀?
当您从 HDFS 读取数据时,分区数取决于您正在读取的块数.从附加的图像来看,您的数据似乎并未在集群中均匀分布.尝试重新分区您的数据并增加调整内核和执行程序的数量.
如果您要重新分区数据,哈希分区器将返回一个比其他值更常见的值,这会导致数据倾斜.
如果这是在执行 join 之后,那么您的数据是偏斜的.
I have a performance issue and after analyzing Spark web UI i found what it seems to be data skewness:
Initially i thought partitions were not evenly distributed, so i performed an analysis of rowcount per partitions, but it seems normal(with no outliers): how to manually run pyspark's partitioning function for debugging
But the problem persists and i see there is one executor processing most of the data:
So the hypothesis now is partitions are not evenly distributed across executors, question is: how spark distributes partitions to executors? and how can i change it to solve my skewness problem?
The code is very simple:
hive_query = """SELECT ... FROM <multiple joined hive tables>"""
df = sqlContext.sql(hive_query).cache()
print(df.count())
Update after posting this question i performed further analysis and found that there 3 tables that cause this, if they are removed the data is evenly distributed in the executors and performance improves, so i added the spark sql hint /*+ BROADCASTJOIN(<table_name>) */ and it worked, performance is much better now, but the question remains:
why do this tables(including a small 6 rows table) cause this uneven distribution across executors when added to the query ?
When you are reading data from HDFS the number of partitions depends on the number of blocks you are reading. From the images attached it looks like your data is not distributed evenly across the cluster. Try repartitioning your data and increase tweak the number of cores and executors.
If you are repartitioning your data, the hash partitioner is returning a value which is more common than other can lead to data skew.
If this is after performing join, then your data is skewed.
这篇关于spark 如何将分区分配给执行程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!