如何通过Spark中的中间条件提高广播加入速度 [英] How to improve broadcast Join speed with between condition in Spark

查看:155
本文介绍了如何通过Spark中的中间条件提高广播加入速度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据帧A和B.A大(100 G),而B相对小(100 M). A的分区号为8,B的分区号为1.

I have two dataframes A and B. A is large (100 G) and B is relatively small (100 M). Partition number of A is 8 and Partition number of B is 1.

A.join(broadcast(B), $"cur" >= $"low" &&  $"cur" <= $"high", "left_outer")

速度非常慢(> 10小时).

The speed is pretty slow (> 10 hours).

但是,如果我将联接条件更改为:

But if I change the join condition to:

A.join(broadcast(B), $"cur" === $"low" , "left_outer")

它变得非常快(<30分钟).但是条件不能改变.

It becomes extremely fast (< 30 minutes). But the condition cannot be changed.

那么有什么方法可以进一步提高我的原始加入条件下的加入速度?

So are there any ways to further improve the join speed on my original join condition?

推荐答案

诀窍是重写join条件,使其包含=组件,该组件可用于优化查询并缩小可能的匹配范围.对于数字值,可以对数据进行存储桶化,并使用存储桶作为联接条件.

The trick is to rewrite join condition so it contains = component which can be used to optimize the query and narrow down possible matches. For numeric values you bucketize your data and use buckets for join condition.

假设您的数据如下所示:

Let's say your data looks like this:

val a = spark.range(100000)
  .withColumn("cur", (rand(1) * 1000).cast("bigint"))

val b = spark.range(100)
  .withColumn("low", (rand(42) * 1000).cast("bigint"))
  .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))

首先选择适合您数据的存储桶大小.在这种情况下,我们可以使用50:

First choose a bucket size appropriate for your data. In this case we can use 50:

val bucketSize = 50L

a中的每一行分配存储桶:

Assign bucket for each row from a:

val aBucketed = a.withColumn(
  "bucket", ($"cur" / bucketSize).cast("bigint") * bucketSize
)

创建UDF,该UDF将发出一定范围的存储桶:

Create UDF which will emit buckets for a range:

def get_buckets(bucketSize: Long) = 
  udf((low: Long, high: Long) => {
    val min = (low / bucketSize) * bucketSize
    val max = (high / bucketSize) * bucketSize
    (min to max by bucketSize).toSeq
  })

和存储区b:

val bBucketed = b.withColumn(
  "bucket", explode(get_buckets(bucketSize)($"low",  $"high"))
)

join条件下使用存储桶:

aBucketed.join(
  broadcast(bBucketed), 
  aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" &&  
    $"cur" <= $"high",
  "leftouter"
)

这样,Spark将使用BroadcastHashJoin:

This way Spark will use BroadcastHashJoin:

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double) / 50.0) as bigint) * 50) AS bucket#184L]
:  +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:     +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
   +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
      +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
         +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
            +- *Range (0, 100, step=1, splits=Some(8))

代替BroadcastNestedLoopJoin:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:  +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
   +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
      +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
         +- *Range (0, 100, step=1, splits=Some(8))

您可以调整存储桶大小以在精度和数据大小之间取得平衡.

You can tune bucket size to balance between precision and data size.

如果您不介意较低级别的解决方案,则broadcast具有恒定项目访问权限的排序序列(例如ArrayVector),然后将udf与二进制搜索结合使用.

If you don't mind a lower level solution then broadcast a sorted sequence with constant item access (like Array or Vector) and use udf with binary search for joining.

您还应该查看分区数. 100GB的8个分区似乎很低.

You should also take a look at the number of partitions. 8 partitions for 100GB seems pretty low.

另请参见:

  • SPARK-8682 - Range Join for Spark SQL
  • SPARK-22947 - SPIP: as-of join in Spark SQL

Spark SQL性能-联接最小和最大之间的值

这篇关于如何通过Spark中的中间条件提高广播加入速度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆