Spark SQL 性能 - 加入最小值和最大值之间的值 [英] Spark SQL performance - JOIN on value BETWEEN min and max

查看:43
本文介绍了Spark SQL 性能 - 加入最小值和最大值之间的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个文件用于存储:

I have two files in which I store:

  1. IP 范围 - 国家/地区查找
  2. 来自不同 IP 的请求列表

IP 存储为整数(使用 inet_aton()).

The IPs are stored as integers (using inet_aton()).

我尝试使用 Spark SQL 通过将两个文件加载到数据帧并将它们注册为临时表来连接这些数据.

I tried using Spark SQL to join these pieces of data by loading both files into dataframes and registering them as temp tables.

GeoLocTable - ipstart, ipend, ...additional Geo location data
Recordstable - INET_ATON, ...3 more fields

我尝试使用 Spark SQL 使用像这样的 SQL 语句来连接这些数据 -

I tried using Spark SQL to join these pieces of data using a SQL statement like so -

"select a.*, b.* from Recordstable a left join GeoLocTable b on a.INET_ATON between b.ipstart and b.ipend"

RecordsTable 大约有 850K 条记录,GeoLocTable 大约有 2.5M 条记录.现有的连接运行大约 2 小时,大约有 20 个执行程序.

There are about 850K records in RecordsTable and about 2.5M records in GeoLocTable. The join as it exists runs for about 2 hours with about 20 executors.

我尝试过缓存和广播 GeoLocTable,但它似乎并没有真正帮助.我已经提高了 spark.sql.autoBroadcastJoinThreshold=300000000 和 spark.sql.shuffle.partitions=600.

I have tried caching and broadcasting the GeoLocTable but it does not really seem to help. I have bumped up spark.sql.autoBroadcastJoinThreshold=300000000 and spark.sql.shuffle.partitions=600.

Spark UI 显示正在执行的 BroadcastNestedLoopJoin.这是我应该期待的最好的吗?我尝试搜索将执行此类连接的条件,但文档似乎很少.

Spark UI shows a BroadcastNestedLoopJoin being performed. Is this the best I should be expecting? I tried searching for conditions where this type of join would be performed but the documentation seems sparse.

PS - 我使用 PySpark 来处理 Spark.

PS - I am using PySpark to work with Spark.

推荐答案

问题的根源很简单.当您执行 join 并且 join 条件不是基于相等性时,Spark 现在唯一能做的就是将其扩展为笛卡尔积,然后过滤 BroadcastNestedLoopJoin 中发生的几乎所有事情.所以逻辑上你有这个巨大的嵌套循环,它测试所有 850K * 2.5M 记录.

The source of the problem is pretty simple. When you execute join and join condition is not equality based the only thing that Spark can do right now is expand it to Cartesian product followed by filter what is pretty much what happens inside BroadcastNestedLoopJoin. So logically you have this huge nested loop which tests all 850K * 2.5M records.

这种方法显然效率极低.由于看起来查找表适合内存,最简单的改进是使用本地、排序的数据结构而不是 Spark DataFrame.假设您的数据如下所示:

This approach is obviously extremely inefficient. Since it looks like lookup table fits into memory the simplest improvement is to use local, sorted data structure instead of Spark DataFrame. Assuming your data looks like this:

geo_loc_table = sc.parallelize([
    (1, 10, "foo"), (11, 36, "bar"), (37, 59, "baz"),
]).toDF(["ipstart", "ipend", "loc"])

records_table = sc.parallelize([
    (1,  11), (2, 38), (3, 50)
]).toDF(["id", "inet"])

我们可以通过ipstart对参考数据进行投影和排序,并创建广播变量:

We can project and sort reference data by ipstart and create broadcast variable:

geo_start_bd = sc.broadcast(geo_loc_table
  .select("ipstart")
  .orderBy("ipstart") 
  .flatMap(lambda x: x)
  .collect())

接下来我们将使用 UDF 和 bisect 模块来扩充 records_table

Next we'll use an UDF and bisect module to augment records_table

from bisect import bisect_right
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

# https://docs.python.org/3/library/bisect.html#searching-sorted-lists
def find_le(x):
    'Find rightmost value less than or equal to x'
    i = bisect_right(geo_start_bd.value, x)
    if i:
        return geo_start_bd.value[i-1]
    return None

records_table_with_ipstart = records_table.withColumn(
    "ipstart", udf(find_le, LongType())("inet")
)

最后加入两个数据集:

 records_table_with_ipstart.join(geo_loc_table, ["ipstart"], "left")

这篇关于Spark SQL 性能 - 加入最小值和最大值之间的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆