Spark SQL性能-最小和最大之间的值JOIN [英] Spark SQL performance - JOIN on value BETWEEN min and max

查看:337
本文介绍了Spark SQL性能-最小和最大之间的值JOIN的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个文件要存储:

  1. IP范围-国家/地区查询
  2. 来自不同IP的请求列表

IP被存储为整数(使用inet_aton()).

The IPs are stored as integers (using inet_aton()).

我尝试使用Spark SQL通过将两个文件都加载到数据帧中并将它们注册为临时表来联接这些数据.

I tried using Spark SQL to join these pieces of data by loading both files into dataframes and registering them as temp tables.

GeoLocTable - ipstart, ipend, ...additional Geo location data
Recordstable - INET_ATON, ...3 more fields

我尝试使用Spark SQL通过类似这样的SQL语句将这些数据段连接起来-

I tried using Spark SQL to join these pieces of data using a SQL statement like so -

"select a.*, b.* from Recordstable a left join GeoLocTable b on a.INET_ATON between b.ipstart and b.ipend"

RecordsTable中大约有850K条记录,GeoLocTable中大约有250万条记录.现有的联接在大约20个执行者的情况下运行了大约2个小时.

There are about 850K records in RecordsTable and about 2.5M records in GeoLocTable. The join as it exists runs for about 2 hours with about 20 executors.

我已经尝试过缓存和广播GeoLocTable,但是它似乎并没有帮助.我提高了spark.sql.autoBroadcastJoinThreshold = 300000000和spark.sql.shuffle.partitions = 600.

I have tried caching and broadcasting the GeoLocTable but it does not really seem to help. I have bumped up spark.sql.autoBroadcastJoinThreshold=300000000 and spark.sql.shuffle.partitions=600.

Spark UI显示正在执行的BroadcastNestedLoopJoin.这是我应该期待的最好的吗?我尝试搜索将执行这种类型的连接的条件,但文档似乎很少.

Spark UI shows a BroadcastNestedLoopJoin being performed. Is this the best I should be expecting? I tried searching for conditions where this type of join would be performed but the documentation seems sparse.

PS-我正在使用PySpark与Spark配合使用.

PS - I am using PySpark to work with Spark.

推荐答案

问题的根源很简单.当您执行联接并且联接条件不是基于等式时,Spark现在唯一能做的就是将其扩展为笛卡尔乘积,然后过滤BroadcastNestedLoopJoin中发生的事情.因此,从逻辑上讲,您有一个巨大的嵌套循环,可以测试所有850K * 2.5M记录.

The source of the problem is pretty simple. When you execute join and join condition is not equality based the only thing that Spark can do right now is expand it to Cartesian product followed by filter what is pretty much what happens inside BroadcastNestedLoopJoin. So logically you have this huge nested loop which tests all 850K * 2.5M records.

这种方法显然效率极低.由于查找表看起来适合内存,因此最简单的改进是使用本地的排序数据结构,而不是Spark DataFrame.假设您的数据如下所示:

This approach is obviously extremely inefficient. Since it looks like lookup table fits into memory the simplest improvement is to use local, sorted data structure instead of Spark DataFrame. Assuming your data looks like this:

geo_loc_table = sc.parallelize([
    (1, 10, "foo"), (11, 36, "bar"), (37, 59, "baz"),
]).toDF(["ipstart", "ipend", "loc"])

records_table = sc.parallelize([
    (1,  11), (2, 38), (3, 50)
]).toDF(["id", "inet"])

我们可以通过ipstart对参考数据进行投影和排序,并创建广播变量:

We can project and sort reference data by ipstart and create broadcast variable:

geo_start_bd = sc.broadcast(geo_loc_table
  .select("ipstart")
  .orderBy("ipstart") 
  .flatMap(lambda x: x)
  .collect())

接下来,我们将使用UDF和bisect模块来增强records_table

Next we'll use an UDF and bisect module to augment records_table

from bisect import bisect_right
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

# https://docs.python.org/3/library/bisect.html#searching-sorted-lists
def find_le(x):
    'Find rightmost value less than or equal to x'
    i = bisect_right(geo_start_bd.value, x)
    if i:
        return geo_start_bd.value[i-1]
    return None

records_table_with_ipstart = records_table.withColumn(
    "ipstart", udf(find_le, LongType())("inet")
)

并最终加入两个数据集:

and finally join both datasets:

 records_table_with_ipstart.join(geo_loc_table, ["ipstart"], "left")

这篇关于Spark SQL性能-最小和最大之间的值JOIN的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆