用于高效加入 Spark 数据帧/数据集的分区数据 [英] Partition data for efficient joining for Spark dataframe/dataset

查看:24
本文介绍了用于高效加入 Spark 数据帧/数据集的分区数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要根据一些共享键列将许多数据帧join.对于键值 RDD,可以指定一个分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在 join 之前有混洗相关操作).可以在 Spark DataFrames 或 DataSets 上做同样的事情吗?

I need to join many DataFrames together based on some shared key columns. For a key-value RDD, one can specify a partitioner so that data points with same key are shuffled to same executor so joining is more efficient (if one has shuffle related operations before the join). Can the same thing can be done on Spark DataFrames or DataSets?

推荐答案

如果您知道将多次加入 DataFrame,您可以在加载它后重新分区

You can repartition a DataFrame after loading it if you know you'll be joining it multiple times

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

所以它会混洗一次数据,然后在加入后续时间时重用混洗文件.

So it'll shuffle the data once and then reuse the shuffle files when joining subsequent times.

但是,如果您知道将重复混洗某些键上的数据,最好的办法是将数据保存为分桶表.这会将已预先散列分区的数据写出,因此当您读入表并加入它们时,您可以避免洗牌.你可以这样做:

However, if you know you'll be repeatedly shuffling data on certain keys, your best bet would be to save the data as bucketed tables. This will write the data out already pre-hash partitioned, so when you read the tables in and join them you avoid the shuffle. You can do so as follows:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned

为了避免洗牌,表必须使用相同的分桶(例如,相同数量的桶并加入桶列).

In order to avoid a shuffle, the tables have to use the same bucketing (e.g. same number of buckets and joining on the bucket columns).

这篇关于用于高效加入 Spark 数据帧/数据集的分区数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆