如何在 Spark 中的条件之间提高广播加入速度 [英] How to improve broadcast Join speed with between condition in Spark

查看:24
本文介绍了如何在 Spark 中的条件之间提高广播加入速度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧 A 和 B.A 很大(100 G),B 相对较小(100 M).A的分区数为8,B的分区数为1.

I have two dataframes A and B. A is large (100 G) and B is relatively small (100 M). Partition number of A is 8 and Partition number of B is 1.

A.join(broadcast(B), $"cur" >= $"low" &&  $"cur" <= $"high", "left_outer")

速度很慢(> 10 小时).

The speed is pretty slow (> 10 hours).

但是如果我将加入条件更改为:

But if I change the join condition to:

A.join(broadcast(B), $"cur" === $"low" , "left_outer")

它变得非常快(<30 分钟).但条件无法改变.

It becomes extremely fast (< 30 minutes). But the condition cannot be changed.

那么有什么办法可以在我原来的join条件下进一步提高join速度?

So are there any ways to further improve the join speed on my original join condition?

推荐答案

诀窍是重写 join 条件,使其包含 = 组件,可用于优化查询并缩小可能的匹配范围.对于数值,您将数据分桶并使用桶作为连接条件.

The trick is to rewrite join condition so it contains = component which can be used to optimize the query and narrow down possible matches. For numeric values you bucketize your data and use buckets for join condition.

假设您的数据如下所示:

Let's say your data looks like this:

val a = spark.range(100000)
  .withColumn("cur", (rand(1) * 1000).cast("bigint"))

val b = spark.range(100)
  .withColumn("low", (rand(42) * 1000).cast("bigint"))
  .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))

首先选择适合您数据的存储桶大小.在这种情况下,我们可以使用 50:

First choose a bucket size appropriate for your data. In this case we can use 50:

val bucketSize = 50L

a中的每一行分配bucket:

Assign bucket for each row from a:

val aBucketed = a.withColumn(
  "bucket", ($"cur" / bucketSize).cast("bigint") * bucketSize
)

创建UDF,它将为一个范围发出桶:

Create UDF which will emit buckets for a range:

def get_buckets(bucketSize: Long) = 
  udf((low: Long, high: Long) => {
    val min = (low / bucketSize) * bucketSize
    val max = (high / bucketSize) * bucketSize
    (min to max by bucketSize).toSeq
  })

和桶b:

val bBucketed = b.withColumn(
  "bucket", explode(get_buckets(bucketSize)($"low",  $"high"))
)

join条件下使用bucket:

use bucket in join condition:

aBucketed.join(
  broadcast(bBucketed), 
  aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" &&  
    $"cur" <= $"high",
  "leftouter"
)

这样 Spark 将使用 BroadcastHashJoin:

This way Spark will use BroadcastHashJoin:

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double) / 50.0) as bigint) * 50) AS bucket#184L]
:  +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:     +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
   +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
      +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
         +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
            +- *Range (0, 100, step=1, splits=Some(8))

代替BroadcastNestedLoopJoin:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:  +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
   +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
      +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
         +- *Range (0, 100, step=1, splits=Some(8))

您可以调整存储桶大小以在精度和数据大小之间取得平衡.

You can tune bucket size to balance between precision and data size.

如果您不介意使用较低级别的解决方案,那么broadcast 具有恒定项目访问权限的排序序列(例如 ArrayVector)和使用 udf 进行二分搜索加入.

If you don't mind a lower level solution then broadcast a sorted sequence with constant item access (like Array or Vector) and use udf with binary search for joining.

您还应该查看分区数.100GB 的 8 个分区似乎很低.

You should also take a look at the number of partitions. 8 partitions for 100GB seems pretty low.

另见:

Spark SQL 性能 - 加入最小值和最大值之间的值

这篇关于如何在 Spark 中的条件之间提高广播加入速度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆