Spark:加入两个相同分区的数据帧时,防止混洗/交换 [英] Spark: Prevent shuffle/exchange when joining two identically partitioned dataframes
问题描述
我有两个数据帧df1
和df2
,我想在一个称为visitor_id
的高基数字段上多次联接这些表.我只想执行一次初始改组,并进行所有联接,而无需在Spark执行程序之间改组/交换数据.
I have two dataframes df1
and df2
and I want to join these tables many times on a high cardinality field called visitor_id
. I would like to perform only one initial shuffle and have all the joins take place without shuffling/exchanging data between spark executors.
为此,我创建了另一列称为visitor_partition
的列,该列始终为每个visitor_id分配一个在[0, 1000)
之间的随机值.我使用了一个自定义分区程序来确保df1
和df2
完全分区,从而每个分区都专门包含来自visitor_partition
值的行.最初的重新分区是我仅有的一次洗牌数据.
To do so, I have created another column called visitor_partition
that consistently assigns each visitor_id a random value between [0, 1000)
. I have used a custom partitioner to ensure that the df1
and df2
are exactly partitioned such that each partition contains exclusively rows from one value of visitor_partition
. This initial repartition is the only time I want to shuffle the data.
我已将每个数据帧保存到s3中的镶木地板中,并按访问者分区进行了分区-对于每个数据帧,这将创建1000个以df1/visitor_partition=0
,df1/visitor_partition=1
... df1/visitor_partition=999
组织的文件.
I have saved each dataframe to parquet in s3, paritioning by visitor partition -- for each data frame, this creates 1000 files organized in df1/visitor_partition=0
, df1/visitor_partition=1
...df1/visitor_partition=999
.
现在我从镶木地板中加载每个数据帧,并通过df1.createOrReplaceTempView('df1')
将它们注册为临时视图(与df2相同),然后运行以下查询
Now I load each dataframe from the parquet and register them as tempviews via df1.createOrReplaceTempView('df1')
(and the same thing for df2) and then run the following query
SELECT
...
FROM
df1 FULL JOIN df1 ON
df1.visitor_partition = df2.visitor_partition AND
df1.visitor_id = df2.visitor_id
从理论上讲,查询执行计划者应意识到此处不需要进行改组.例如,单个执行程序可以从df1/visitor_partition=1
和df2/visitor_partition=2
加载数据并在其中联接行.但是,在实践中,spark 2.4.4的查询计划程序会在此处执行完整的数据混洗.
In theory, the query execution planner should realize that no shuffling is necessary here. E.g., a single executor could load in data from df1/visitor_partition=1
and df2/visitor_partition=2
and join the rows in there. However, in practice spark 2.4.4's query planner performs a full data shuffle here.
有什么办法可以防止这种洗牌的发生吗?
Is there some way I can prevent this shuffle from taking place?
推荐答案
在以下示例中,VisitorID列的值将散列到500个存储桶中.通常,对于联接,Spark将基于VisitorID上的哈希执行交换阶段.但是,在这种情况下,您已经使用哈希对数据进行了预分区.
In the following example, the value of the column VisitorID will be hashed into 500 buckets. Normally, for the join Spark would perform an exchange phase based on the hash on the VisitorID. However, in this case you already have the data pre-partitioned with the hash.
inputRdd = sc.parallelize(list((i, i%200) for i in range(0,1000000)))
schema = StructType([StructField("VisitorID", IntegerType(), True),
StructField("visitor_partition", IntegerType(), True)])
inputdf = inputRdd.toDF(schema)
inputdf.write.bucketBy(500, "VisitorID").saveAsTable("bucketed_table")
inputDf1 = spark.sql("select * from bucketed_table")
inputDf2 = spark.sql("select * from bucketed_table")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"), col("df1.VisitorID") == col("df2.VisitorID"))
有时Spark查询优化器仍然选择广播交换,因此在我们的示例中,让我们禁用自动广播
Sometimes Spark query optimizer still choose broadcast exchange, so for our example, let's disable auto broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
实际计划如下:
== Physical Plan ==
*(3) SortMergeJoin [VisitorID#351], [VisitorID#357], Inner
:- *(1) Sort [VisitorID#351 ASC NULLS FIRST], false, 0
: +- *(1) Project [VisitorID#351, visitor_partition#352]
: +- *(1) Filter isnotnull(VisitorID#351)
: +- *(1) FileScan parquet default.bucketed_6[VisitorID#351,visitor_partition#352] Batched: true, DataFilters: [isnotnull(VisitorID#351)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
+- *(2) Sort [VisitorID#357 ASC NULLS FIRST], false, 0
+- *(2) Project [VisitorID#357, visitor_partition#358]
+- *(2) Filter isnotnull(VisitorID#357)
+- *(2) FileScan parquet default.bucketed_6[VisitorID#357,visitor_partition#358] Batched: true, DataFilters: [isnotnull(VisitorID#357)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
做类似的事情:
inputdf.write.partitionBy("visitor_partition").saveAsTable("partitionBy_2")
实际上为每个分区创建一个带有文件夹的结构.但是,由于Spark连接基于哈希并且无法利用您的自定义结构,因此无法正常工作.
Creates indeed the structure with a folder for each partition. But it's not working since the Spark join is based on the hash and is not able to leverage your custom structure.
我误解了您的示例.我相信您是在谈论诸如partitionBy之类的问题,而不是先前版本中提到的重新分区.
I misunderstood your example. I believe you were talking about something like partitionBy, not repartition as mentioned in the previous version.
这篇关于Spark:加入两个相同分区的数据帧时,防止混洗/交换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!