如何对数据进行物理分区以避免Spark SQL联接中的乱序 [英] How to physically partition data to avoid shuffle in Spark SQL joins

查看:183
本文介绍了如何对数据进行物理分区以避免Spark SQL联接中的乱序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将5个中等大小的表(每个〜80 gb)连接在一起,输入数据很大〜800 gb.所有数据均位于HIVE表中. 我正在使用Spark SQL 1.6.1实现此目的. 加入需要40分钟的时间才能完成 --num-executors 20 --driver-memory 40g --executor-memory 65g --executor-cores 6.所有联接都是排序合并外部联接.还看到了许多洗牌活动.

I have a requirement to join 5 medium size tables (~80 gb each) with a big Input data ~ 800 gb. All data resides in HIVE tables. I am using Spark SQL 1.6.1 for achieving this. Join is taking 40 mins of time to complete with --num-executors 20 --driver-memory 40g --executor-memory 65g --executor-cores 6. All joins are sort merge outer joins. Also seeing a lot of shuffle happening.

我将配置单元中的所有表存储到相同数量的存储区中,以便所有表中的相似键在第一次加载数据本身时将进入相同的spark分区.但似乎星火无法理解转储.

I bucketed all tables in hive into same number of buckets so that similar keys from all tables will go to same spark partitions while loading data itself at first. But it seems spark does not understand bucketing.

我还有其他方法可以对分区进行物理分区吗?在Hive中排序数据(没有零件文件),以便Spark在从Hive本身加载数据时知道分区键,并在相同的分区中进行联接而不用乱码数据?这样可以避免在从配置单元中加载数据后进行额外的重新分区.

Is there any other way i can physically partition & sort data in Hive (no of part files) so that spark will know about partitioning keys while loading data from hive itself and do a join with in the same partitioning without shuffling data around? This will avoid additional re-Partitioning after loading data from hive.

推荐答案

首先,Spark Sql 1.6.1还不支持配置单元存储桶. 因此,在这种情况下,我们需要执行Spark级别的操作,以确保在加载数据时所有表都必须进入相同的Spark分区. Spark API提供了repartition和sortWithinPartitions来实现相同的目的.例如

First of all Spark Sql 1.6.1 doesn't support hive buckets yet. So in this case we are left with Spark level operations ensuring that all tables must go to same spark partitions while loading the data. Spark API provides repartition and sortWithinPartitions to achieve the same. e.g

val part1 = df1.repartition(df1("key1")).sortWithinPartitions(df1("key1"))

val part1 = df1.repartition(df1("key1")).sortWithinPartitions(df1("key1"))

以相同的方式,您可以为其余表生成分区的世代,并将它们加入到在分区内排序的键上.

In the same way you can go for the generations of partitions for remaining tables and joined them on the key which was sorted within partitions.

这将使联接无混洗"操作,但会带来较大的计算成本.如果后续将执行数据帧缓存(可以对新创建的分区进行缓存操作),则执行效果更好.希望对您有所帮助.

This will make the join "shuffle free" operation but come with major computational cost. Caching the dataframes(you can go for cache operation for newly created partition) perform better if the operation will be performed subsequent times. Hope this help.

这篇关于如何对数据进行物理分区以避免Spark SQL联接中的乱序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆