如何在 Spark 中的条件之间提高广播加入速度 [英] How to improve broadcast Join speed with between condition in Spark
问题描述
我有两个数据帧 A 和 B.A 很大(100 G),B 相对较小(100 M).A的分区数为8,B的分区数为1.
I have two dataframes A and B. A is large (100 G) and B is relatively small (100 M). Partition number of A is 8 and Partition number of B is 1.
A.join(broadcast(B), $"cur" >= $"low" && $"cur" <= $"high", "left_outer")
速度很慢(> 10 小时).
The speed is pretty slow (> 10 hours).
但是如果我将加入条件更改为:
But if I change the join condition to:
A.join(broadcast(B), $"cur" === $"low" , "left_outer")
它变得非常快(<30 分钟).但条件无法改变.
It becomes extremely fast (< 30 minutes). But the condition cannot be changed.
那么有什么办法可以在我原来的join条件下进一步提高join速度?
So are there any ways to further improve the join speed on my original join condition?
推荐答案
诀窍是重写 join
条件,使其包含 =
组件,可用于优化查询并缩小可能的匹配范围.对于数值,您将数据分桶并使用桶作为连接条件.
The trick is to rewrite join
condition so it contains =
component which can be used to optimize the query and narrow down possible matches. For numeric values you bucketize your data and use buckets for join condition.
假设您的数据如下所示:
Let's say your data looks like this:
val a = spark.range(100000)
.withColumn("cur", (rand(1) * 1000).cast("bigint"))
val b = spark.range(100)
.withColumn("low", (rand(42) * 1000).cast("bigint"))
.withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))
首先选择适合您数据的存储桶大小.在这种情况下,我们可以使用 50:
First choose a bucket size appropriate for your data. In this case we can use 50:
val bucketSize = 50L
为a
中的每一行分配bucket:
Assign bucket for each row from a
:
val aBucketed = a.withColumn(
"bucket", ($"cur" / bucketSize).cast("bigint") * bucketSize
)
创建UDF,它将为一个范围发出桶:
Create UDF which will emit buckets for a range:
def get_buckets(bucketSize: Long) =
udf((low: Long, high: Long) => {
val min = (low / bucketSize) * bucketSize
val max = (high / bucketSize) * bucketSize
(min to max by bucketSize).toSeq
})
和桶b
:
val bBucketed = b.withColumn(
"bucket", explode(get_buckets(bucketSize)($"low", $"high"))
)
在join
条件下使用bucket:
use bucket in join
condition:
aBucketed.join(
broadcast(bBucketed),
aBucketed("bucket") === bBucketed("bucket") &&
$"cur" >= $"low" &&
$"cur" <= $"high",
"leftouter"
)
这样 Spark 将使用 BroadcastHashJoin
:
This way Spark will use BroadcastHashJoin
:
*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double) / 50.0) as bigint) * 50) AS bucket#184L]
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
+- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
代替BroadcastNestedLoopJoin
:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
您可以调整存储桶大小以在精度和数据大小之间取得平衡.
You can tune bucket size to balance between precision and data size.
如果您不介意使用较低级别的解决方案,那么broadcast
具有恒定项目访问权限的排序序列(例如 Array
或 Vector
)和使用 udf
进行二分搜索加入.
If you don't mind a lower level solution then broadcast
a sorted sequence with constant item access (like Array
or Vector
) and use udf
with binary search for joining.
您还应该查看分区数.100GB 的 8 个分区似乎很低.
You should also take a look at the number of partitions. 8 partitions for 100GB seems pretty low.
另见:
- SPARK-8682 - Spark SQL 的范围连接em>
SPARK-22947 - SPIP:as-在 Spark SQL 中加入
这篇关于如何在 Spark 中的条件之间提高广播加入速度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!