如何避免pyspark中join操作的过度shuffle? [英] How to avoid excessive shuffles in join operation in pyspark?

查看:23
本文介绍了如何避免pyspark中join操作的过度shuffle?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个大约 25 GB 的大型 spark 数据帧,我必须将它与另一个大约 15 GB 的数据帧连接起来.

I have a large spark dataframe which is around 25 GB in size which I have to join with another dataframe with about 15 GB in size.

现在,当我运行代码时,大约需要 15 分钟才能完成

Now when I run the code it is taking around 15 minutes to complete

资源分配是40 个 executor,每个 128 GB 内存

Resource allocation is 40 executors with 128 GB memory each

当我检查它的执行计划时,正在执行排序合并连接.

When I went through its execution plan, the sort merge join was being performed.

问题是:

连接在相同键但不同的表上执行大约 5 到 6 次,因为在为每次执行的连接合并/连接数据之前,它花费了大部分时间对数据进行排序和共同定位分区.

The join is performed around 5 to 6 times on same key but different tables because of that it was taking most of the time sorting the data and co-locating the partitions before merging/joining the data for every join performed.

那么有没有什么方法可以在执行连接之前对数据进行排序,以便不对每个连接执行排序操作,或者以更少的时间进行排序而花费更多的时间来实际连接数据?

So is there any way to sort the data before performing the join so that the sort operation is not performed for each join or optimized in such a way that it takes less time sorting and more time actually joining the data?

我只想在执行连接之前对我的数据框进行排序,但不知道该怎么做?

I just want to sort my dataframe before performing the join but not sure how to do it?

例如:

如果我的数据框加入 id 列

If my dataframe is joining on id column

joined_df = df1.join(df2,df1.id==df2.id)

如何在加入之前根据id"对数据帧进行排序,以便分区位于同一位置?

How can I sort the dataframe based on 'id' before joining so that the partitions are co-located?

推荐答案

那么有没有什么方法可以在执行连接之前对数据进行排序,以便不对每个连接执行排序操作,或者以更少的时间进行排序而花费更多的时间来实际连接数据?

So is there any way to sort the data before performing the join so that the sort operation is not performed for each join or optimized in such a way that it takes less time sorting and more time actually joining the data?

这闻起来像桶.

分桶是一种使用分桶(和分桶列)来确定数据分区并避免数据混洗的优化技术.

Bucketing is an optimization technique that uses buckets (and bucketing columns) to determine data partitioning and avoid data shuffle.

这个想法是 bucketBy 数据集,所以 Spark 知道键是共同定位的(已经预先洗牌).参与 join 的 DataFrame 的桶数和桶数列必须相同.

The idea is to bucketBy the datasets so Spark knows that keys are co-located (pre-shuffled already). The number of buckets and the bucketing columns have to be the same across DataFrames participating in join.

请注意,Hive 或 Spark 表 (saveAsTable) 支持此操作,因为存储桶元数据是从 Metastore(Spark 或 Hive 的)中获取的.

Please note that this is supported for Hive or Spark tables (saveAsTable) as the bucket metadata is fetched from a metastore (Spark's or Hive's).

这篇关于如何避免pyspark中join操作的过度shuffle?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆