Spark:加入两个相同分区的数据帧时,防止混洗/交换 [英] Spark: Prevent shuffle/exchange when joining two identically partitioned dataframes

查看:152
本文介绍了Spark:加入两个相同分区的数据帧时,防止混洗/交换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧df1df2,我想在一个称为visitor_id的高基数字段上多次联接这些表.我只想执行一次初始改组,并进行所有联接,而无需在Spark执行程序之间改组/交换数据.

I have two dataframes df1 and df2 and I want to join these tables many times on a high cardinality field called visitor_id. I would like to perform only one initial shuffle and have all the joins take place without shuffling/exchanging data between spark executors.

为此,我创建了另一列称为visitor_partition的列,该列始终为每个visitor_id分配一个在[0, 1000)之间的随机值.我使用了一个自定义分区程序来确保df1df2完全分区,从而每个分区都专门包含来自visitor_partition值的行.最初的重新分区是我仅有的一次洗牌数据.

To do so, I have created another column called visitor_partition that consistently assigns each visitor_id a random value between [0, 1000). I have used a custom partitioner to ensure that the df1 and df2 are exactly partitioned such that each partition contains exclusively rows from one value of visitor_partition. This initial repartition is the only time I want to shuffle the data.

我已将每个数据帧保存到s3中的镶木地板中,并按访问者分区进行了分区-对于每个数据帧,这将创建1000个以df1/visitor_partition=0df1/visitor_partition=1 ... df1/visitor_partition=999组织的文件.

I have saved each dataframe to parquet in s3, paritioning by visitor partition -- for each data frame, this creates 1000 files organized in df1/visitor_partition=0, df1/visitor_partition=1...df1/visitor_partition=999.

现在我从镶木地板中加载每个数据帧,并通过df1.createOrReplaceTempView('df1')将它们注册为临时视图(与df2相同),然后运行以下查询

Now I load each dataframe from the parquet and register them as tempviews via df1.createOrReplaceTempView('df1') (and the same thing for df2) and then run the following query

SELECT
   ...
FROM
  df1 FULL JOIN df1 ON
    df1.visitor_partition = df2.visitor_partition AND
    df1.visitor_id = df2.visitor_id

从理论上讲,查询执行计划者应意识到此处不需要进行改组.例如,单个执行程序可以从df1/visitor_partition=1df2/visitor_partition=2加载数据并在其中联接行.但是,在实践中,spark 2.4.4的查询计划程序会在此处执行完整的数据混洗.

In theory, the query execution planner should realize that no shuffling is necessary here. E.g., a single executor could load in data from df1/visitor_partition=1 and df2/visitor_partition=2 and join the rows in there. However, in practice spark 2.4.4's query planner performs a full data shuffle here.

有什么办法可以防止这种洗牌的发生吗?

Is there some way I can prevent this shuffle from taking place?

推荐答案

您可以使用 bucketBy 方法(其他文档).

在以下示例中,VisitorID列的值将散列到500个存储桶中.通常,对于联接,Spark将基于VisitorID上的哈希执行交换阶段.但是,在这种情况下,您已经使用哈希对数据进行了预分区.

In the following example, the value of the column VisitorID will be hashed into 500 buckets. Normally, for the join Spark would perform an exchange phase based on the hash on the VisitorID. However, in this case you already have the data pre-partitioned with the hash.

inputRdd = sc.parallelize(list((i, i%200) for i in range(0,1000000)))

schema = StructType([StructField("VisitorID", IntegerType(), True),
                    StructField("visitor_partition", IntegerType(), True)])

inputdf = inputRdd.toDF(schema)

inputdf.write.bucketBy(500, "VisitorID").saveAsTable("bucketed_table")

inputDf1 = spark.sql("select * from bucketed_table")
inputDf2 = spark.sql("select * from bucketed_table")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"), col("df1.VisitorID") == col("df2.VisitorID"))

有时Spark查询优化器仍然选择广播交换,因此在我们的示例中,让我们禁用自动广播

Sometimes Spark query optimizer still choose broadcast exchange, so for our example, let's disable auto broadcasting

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

实际计划如下:

== Physical Plan ==
*(3) SortMergeJoin [VisitorID#351], [VisitorID#357], Inner
:- *(1) Sort [VisitorID#351 ASC NULLS FIRST], false, 0
:  +- *(1) Project [VisitorID#351, visitor_partition#352]
:     +- *(1) Filter isnotnull(VisitorID#351)
:        +- *(1) FileScan parquet default.bucketed_6[VisitorID#351,visitor_partition#352] Batched: true, DataFilters: [isnotnull(VisitorID#351)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
+- *(2) Sort [VisitorID#357 ASC NULLS FIRST], false, 0
   +- *(2) Project [VisitorID#357, visitor_partition#358]
      +- *(2) Filter isnotnull(VisitorID#357)
         +- *(2) FileScan parquet default.bucketed_6[VisitorID#357,visitor_partition#358] Batched: true, DataFilters: [isnotnull(VisitorID#357)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500

做类似的事情:

inputdf.write.partitionBy("visitor_partition").saveAsTable("partitionBy_2")

实际上为每个分区创建一个带有文件夹的结构.但是,由于Spark连接基于哈希并且无法利用您的自定义结构,因此无法正常工作.

Creates indeed the structure with a folder for each partition. But it's not working since the Spark join is based on the hash and is not able to leverage your custom structure.

我误解了您的示例.我相信您是在谈论诸如partitionBy之类的问题,而不是先前版本中提到的重新分区.

I misunderstood your example. I believe you were talking about something like partitionBy, not repartition as mentioned in the previous version.

这篇关于Spark:加入两个相同分区的数据帧时,防止混洗/交换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆