如何物理分区数据以避免在 Spark SQL 连接中混洗 [英] How to physically partition data to avoid shuffle in Spark SQL joins

查看:25
本文介绍了如何物理分区数据以避免在 Spark SQL 连接中混洗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将 5 个中等大小的表(每个表约 80 GB)与一个大输入数据(约 800 GB)连接起来.所有数据都驻留在 HIVE 表中.我正在使用 Spark SQL 1.6.1 来实现这一点.加入需要 40 分钟的时间才能完成--num-executors 20 --driver-memory 40g --executor-memory 65g --executor-cores 6.所有连接都是排序合并外部连接.也看到很多洗牌发生.

I have a requirement to join 5 medium size tables (~80 gb each) with a big Input data ~ 800 gb. All data resides in HIVE tables. I am using Spark SQL 1.6.1 for achieving this. Join is taking 40 mins of time to complete with --num-executors 20 --driver-memory 40g --executor-memory 65g --executor-cores 6. All joins are sort merge outer joins. Also seeing a lot of shuffle happening.

我将 hive 中的所有表存储到相同数量的存储桶中,以便所有表中的相似键在最初加载数据时会转到相同的 Spark 分区.但是spark好像不理解bucketing.

I bucketed all tables in hive into same number of buckets so that similar keys from all tables will go to same spark partitions while loading data itself at first. But it seems spark does not understand bucketing.

还有其他方法可以物理分区吗?对 Hive 中的数据进行排序(没有部分文件),以便 spark 在从 Hive 本身加载数据时知道分区键,并在同一个分区中进行连接而不会混洗数据?这将避免在从 hive 加载数据后进行额外的重新分区.

Is there any other way i can physically partition & sort data in Hive (no of part files) so that spark will know about partitioning keys while loading data from hive itself and do a join with in the same partitioning without shuffling data around? This will avoid additional re-Partitioning after loading data from hive.

推荐答案

首先,Spark Sql 1.6.1 还不支持 hive 存储桶.所以在这种情况下,我们只剩下 Spark 级别的操作,确保所有表在加载数据时必须转到相同的 Spark 分区.Spark API 提供了 repartition 和 sortWithinPartitions 来实现相同的功能.例如

First of all Spark Sql 1.6.1 doesn't support hive buckets yet. So in this case we are left with Spark level operations ensuring that all tables must go to same spark partitions while loading the data. Spark API provides repartition and sortWithinPartitions to achieve the same. e.g

val part1 = df1.repartition(df1("key1")).sortWithinPartitions(df1("key1"))

val part1 = df1.repartition(df1("key1")).sortWithinPartitions(df1("key1"))

以同样的方式,您可以为剩余的表生成分区,并将它们连接到在分区内排序的键上.

In the same way you can go for the generations of partitions for remaining tables and joined them on the key which was sorted within partitions.

这将使连接无随机"操作,但会带来大量的计算成本.缓存数据帧(您可以对新创建的分区进行缓存操作)如果该操作将在后续执行,效果会更好.希望这有帮助.

This will make the join "shuffle free" operation but come with major computational cost. Caching the dataframes(you can go for cache operation for newly created partition) perform better if the operation will be performed subsequent times. Hope this help.

这篇关于如何物理分区数据以避免在 Spark SQL 连接中混洗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆