分区数据以有效联接Spark数据帧/数据集 [英] Partition data for efficient joining for Spark dataframe/dataset

查看:90
本文介绍了分区数据以有效联接Spark数据帧/数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要根据某些共享键列将许多DataFrame一起join.对于键值RDD,可以指定一个分区程序,以便将具有相同键的数据点改组到同一执行程序,从而提高连接效率(如果在join之前进行了与改组相关的操作).可以在Spark DataFrames或DataSet上做同样的事情吗?

I need to join many DataFrames together based on some shared key columns. For a key-value RDD, one can specify a partitioner so that data points with same key are shuffled to same executor so joining is more efficient (if one has shuffle related operations before the join). Can the same thing can be done on Spark DataFrames or DataSets?

推荐答案

如果您知道要多次加入它,则可以在装入数据框后repartition

You can repartition a DataFrame after loading it if you know you'll be joining it multiple times

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

因此,它将洗牌一次,然后在以后加入时重新使用洗牌文件.

So it'll shuffle the data once and then reuse the shuffle files when joining subsequent times.

但是,如果您知道要反复对某些键进行数据混排,那么最好的选择是将数据另存为存储桶中的表.这会将已散列前已分区的数据写出,因此,当您读入表并将它们联接在一起时,就可以避免乱码.您可以按照以下步骤进行操作:

However, if you know you'll be repeatedly shuffling data on certain keys, your best bet would be to save the data as bucketed tables. This will write the data out already pre-hash partitioned, so when you read the tables in and join them you avoid the shuffle. You can do so as follows:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned

为了避免乱序,表必须使用相同的存储区(例如,相同数量的存储区并在存储区列上进行联接).

In order to avoid a shuffle, the tables have to use the same bucketing (e.g. same number of buckets and joining on the bucket columns).

这篇关于分区数据以有效联接Spark数据帧/数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆