Spark:在加入两个相同分区的数据帧时防止混洗/交换 [英] Spark: Prevent shuffle/exchange when joining two identically partitioned dataframes

查看:14
本文介绍了Spark:在加入两个相同分区的数据帧时防止混洗/交换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧 df1df2,我想在名为 visitor_id 的高基数字段上多次加入这些表.我只想执行一次初始洗牌,并在不洗牌/交换火花执行程序之间的数据的情况下进行所有连接.

I have two dataframes df1 and df2 and I want to join these tables many times on a high cardinality field called visitor_id. I would like to perform only one initial shuffle and have all the joins take place without shuffling/exchanging data between spark executors.

为此,我创建了另一个名为visitor_partition 的列,该列始终为每个visitor_id 分配一个[0, 1000) 之间的随机值.我使用了一个自定义分区器来确保 df1df2 被完全分区,这样每个分区只包含来自 visitor_partition 的一个值的行.这个初始重新分区是我唯一想打乱数据的时候.

To do so, I have created another column called visitor_partition that consistently assigns each visitor_id a random value between [0, 1000). I have used a custom partitioner to ensure that the df1 and df2 are exactly partitioned such that each partition contains exclusively rows from one value of visitor_partition. This initial repartition is the only time I want to shuffle the data.

我已将每个数据帧保存到 s3 中的 parquet,按访问者分区进行分区——对于每个数据帧,这将创建 1000 个文件,组织在 df1/visitor_partition=0df1/visitor_partition=1...df1/visitor_partition=999.

I have saved each dataframe to parquet in s3, paritioning by visitor partition -- for each data frame, this creates 1000 files organized in df1/visitor_partition=0, df1/visitor_partition=1...df1/visitor_partition=999.

现在我从镶木地板加载每个数据框并通过 df1.createOrReplaceTempView('df1') 将它们注册为临时视图(对于 df2 也是如此),然后运行以下查询

Now I load each dataframe from the parquet and register them as tempviews via df1.createOrReplaceTempView('df1') (and the same thing for df2) and then run the following query

SELECT
   ...
FROM
  df1 FULL JOIN df1 ON
    df1.visitor_partition = df2.visitor_partition AND
    df1.visitor_id = df2.visitor_id

理论上,查询执行计划者应该意识到这里不需要改组.例如,单个执行程序可以从 df1/visitor_partition=1df2/visitor_partition=2 加载数据并加入其中的行.然而,在实践中,spark 2.4.4 的查询规划器在这里执行了完整的数据洗牌.

In theory, the query execution planner should realize that no shuffling is necessary here. E.g., a single executor could load in data from df1/visitor_partition=1 and df2/visitor_partition=2 and join the rows in there. However, in practice spark 2.4.4's query planner performs a full data shuffle here.

有什么办法可以防止这种洗牌发生吗?

Is there some way I can prevent this shuffle from taking place?

推荐答案

您可以使用 bucketBy 方法(其他文档).

You can use the bucketBy method of the DataFrameWriter (other documentation).

在以下示例中,VisitorID 列的值将被散列到 500 个桶中.通常,对于 join,Spark 会根据 VisitorID 上的散列执行交换阶段.但是,在这种情况下,您已经使用哈希对数据进行了预分区.

In the following example, the value of the column VisitorID will be hashed into 500 buckets. Normally, for the join Spark would perform an exchange phase based on the hash on the VisitorID. However, in this case you already have the data pre-partitioned with the hash.

inputRdd = sc.parallelize(list((i, i%200) for i in range(0,1000000)))

schema = StructType([StructField("VisitorID", IntegerType(), True),
                    StructField("visitor_partition", IntegerType(), True)])

inputdf = inputRdd.toDF(schema)

inputdf.write.bucketBy(500, "VisitorID").saveAsTable("bucketed_table")

inputDf1 = spark.sql("select * from bucketed_table")
inputDf2 = spark.sql("select * from bucketed_table")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"), col("df1.VisitorID") == col("df2.VisitorID"))

有时 Spark 查询优化器仍然选择广播交换,所以在我们的例子中,让我们禁用自动广播

Sometimes Spark query optimizer still choose broadcast exchange, so for our example, let's disable auto broadcasting

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

物理计划如下:

== Physical Plan ==
*(3) SortMergeJoin [VisitorID#351], [VisitorID#357], Inner
:- *(1) Sort [VisitorID#351 ASC NULLS FIRST], false, 0
:  +- *(1) Project [VisitorID#351, visitor_partition#352]
:     +- *(1) Filter isnotnull(VisitorID#351)
:        +- *(1) FileScan parquet default.bucketed_6[VisitorID#351,visitor_partition#352] Batched: true, DataFilters: [isnotnull(VisitorID#351)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
+- *(2) Sort [VisitorID#357 ASC NULLS FIRST], false, 0
   +- *(2) Project [VisitorID#357, visitor_partition#358]
      +- *(2) Filter isnotnull(VisitorID#357)
         +- *(2) FileScan parquet default.bucketed_6[VisitorID#357,visitor_partition#358] Batched: true, DataFilters: [isnotnull(VisitorID#357)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500

做类似的事情:

inputdf.write.partitionBy("visitor_partition").saveAsTable("partitionBy_2")

确实为每个分区创建了一个文件夹的结构.但它不起作用,因为 Spark 联接基于散列并且无法利用您的自定义结构.

Creates indeed the structure with a folder for each partition. But it's not working since the Spark join is based on the hash and is not able to leverage your custom structure.

我误解了你的例子.我相信你说的是 partitionBy 之类的东西,而不是之前版本中提到的重新分区.

I misunderstood your example. I believe you were talking about something like partitionBy, not repartition as mentioned in the previous version.

这篇关于Spark:在加入两个相同分区的数据帧时防止混洗/交换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆