在最接近的关键条件下加入Spark DataFrames [英] Joining Spark DataFrames on a nearest key condition

查看:46
本文介绍了在最接近的关键条件下加入Spark DataFrames的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在PySpark中进行模糊联接的有效方式是什么?

What’s a performant way to do fuzzy joins in PySpark?

我正在寻找有关可扩展方法的社区意见,该方法可在最接近的关键条件下加入大型Spark DataFrame.请允许我通过一个有代表性的例子来说明这个问题.假设我们有以下Spark DataFrame,其中包含在某个时间点发生的事件:

I am looking for the community's views on a scalable approach to joining large Spark DataFrames on a nearest key condition. Allow me to illustrate this problem by means of a representative example. Suppose we have the following Spark DataFrame containing events occurring at some point in time:

ddf_event = spark.createDataFrame(
    data=[
        [1, 'A'],
        [5, 'A'],
        [10, 'B'],
        [15, 'A'],
        [20, 'B'],
        [25, 'B'],
        [30, 'A']
    ],
    schema=['ts_event', 'event']
)

以及以下包含在某个时间点测量的GPS数据的Spark DataFrame:

and the following Spark DataFrame containing GPS data measured at some point in time:

ddf_gps = spark.createDataFrame(
    data=[
        [2, '(-46.84635, 173.13674)'],
        [4, '(2.50362, 104.34136)'],
        [8, '(-24.20741, 51.80755)'],
        [15, '(-59.07798, -20.49141)'],
        [18, '(-44.34468, -167.90401)'],
        [24, '(-18.84175, 16.68628)'],
        [27, '(20.48501,58.42423)']
    ],
    schema=['ts_gps', 'gps_coordinates']
)

我们想加入以产生以下结果DataFrame:

which we would like to join to produce the following resulting DataFrame:

+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates        |
+--------+-----+------+-----------------------+
|1       |A    |2     |(-46.84635, 173.13674) |
|5       |A    |4     |(2.50362, 104.34136)   |
|10      |B    |8     |(-24.20741, 51.80755)  |
|15      |A    |15    |(-59.07798, -20.49141) |
|20      |B    |18    |(-44.34468, -167.90401)|
|25      |B    |24    |(-18.84175, 16.68628)  |
|30      |A    |27    |(20.48501,58.42423)    |
+--------+-----+------+-----------------------+

根据事件时间戳和GPS数据时间戳有效地找到最近的GPS数据点.

effectively finding the nearest GPS data point given the event timestamp and GPS data timestamp.

因此,我们遇到了在最接近的关键条件下加入的问题,在这种情况下,最近"被定义为时间戳之间的最小绝对差.

We thus run into the problem of joining on a nearest key condition, 'nearest' in this case being defined as the smallest absolute difference between timestamps.

我已经探索了两种方法来实现此目的:一种基于过滤的合并联接(FBJ),另一种基于过滤的排序联合(FSU).下文将对这两种方法进行更详细的描述.

I've explored two approaches to achieve this: one based on a filtered binned join (FBJ) and one based on a filtered sorted union (FSU). Both approaches are described below in more detail.

FBJ方法取决于参数bin_size,该参数限制可在其中找到匹配的GPS时间戳的时间窗口. bin_size增加bin_size会增加计算量,减少bin_size会降低结果质量.

The FBJ approach depends on the parameter bin_size, which limits the time window in which a matching GPS timestamp may be found. Increasing the bin_size increases the computational load, decreasing it decreases the outcome quality.

这两种方法似乎都不随输入DataFrame的大小线性缩放.

Both approaches do not appear to scale linearly with the size of the input DataFrames.

实际上,我必须处理由数千万行组成的输入数据,因此,我现在迷失了一个可行的解决方案.

In practice I have to deal with input data consisting of tens of millions of rows, therefore I am currently lost for a viable solution to the problem.

FBJ方法包括以下步骤:

The FBJ approach consists of the following steps:

  1. 创建一个ts_bin列,对timestamp列进行装箱,由以下方式实现:
  1. Create a ts_bin column, binning the timestamp columns, implemented by:

bin_size = 10
ddf_event = ddf_event.withColumn(
    'ts_bin',
    F.round(F.col('ts_event') / bin_size)
)

ddf_gps = ddf_gps.withColumn(
    'ts_bin',
    F.round(F.col('ts_gps') / bin_size)
)

  1. ts_bin列上加入数据框,通过以下方式实现:
  1. Join the DataFrames on the ts_bin column, implemented by:

ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')

  1. 确定最小时间戳差异,方法是:

from pyspark.sql.window import Window

window = Window.partitionBy('ts_event')

ddf = ddf.withColumn(
    'ts_diff',
    F.abs(F.col('ts_gps') - F.col('ts_event'))
)

ddf = ddf.withColumn(
    'min_ts_diff',
    F.min(F.col('ts_diff')).over(window)
)

  1. 过滤并选择相关的行和列,通过以下方式实现:

ddf = (
    ddf
    .where(
        (F.col('ts_diff') == F.col('min_ts_diff')) |
        (F.col('ts_diff').isNull())   
    )
    .select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

限制bin_size情况:

  • bin_size >> 1有效地导致完全交叉联接
  • bin_size = 1有效地导致ts_event == ts_gps
  • 上的左连接
  • bin_size >> 1 effectively results in a full cross-join
  • bin_size = 1 effectively results in a left-join on ts_event == ts_gps

FSU方法包括以下步骤:

The FSU approach consists of the following steps:

  1. 统一数据框架,由以下人员实现:

def union(df1, df2):
    cols = list(set(df1.columns).union(set(df2.columns)))
    for col in cols:
        if col not in df1.columns:
            df1 = df1.withColumn(col, F.lit(None))
        if col not in df2.columns:
            df2 = df2.withColumn(col, F.lit(None))
    return df1.select(cols).union(df2.select(cols))

ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)

  1. 排序所得的DataFrame并获取相邻的GPS时间戳记,该时间戳记由以下方式实现:

from sys import maxsize

last_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(0, maxsize)

ddf = (
    ddf.withColumn(
        'prev_time',
        F.last(F.col('ts_gps'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'prev_coordinates',
        F.last(F.col('gps_coordinates'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'next_time',
        F.first(F.col('ts_gps'), ignorenulls=True)
         .over(first_window)
    ).withColumn(
        'next_coordinates',
        F.first(F.col('gps_coordinates'), ignorenulls=True)
         .over(first_window)
    )
)

  1. 过滤并选择相关的行和列,通过以下方式实现:

condition = (F.col('timestamp') - F.col('prev_time')
             < F.col('next_time') - F.col('timestamp'))

ddf = (
    ddf
    .where(F.col('event').isNotNull())
    .withColumn(
        'ts_gps',
        F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
    ).withColumn(
        'gps_coordinates',
        F.when(condition | F.col('next_time').isNull(),
               F.col('prev_coordinates'))
         .otherwise(F.col('next_coordinates'))
    ).select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

推荐答案

您正在寻找的是临时联接. 查看时间序列Spark库Flint(以前叫HuoHua,中文为Spark): https://github.com/twosigma/flint

What you are looking for is a temporal join. Check out the time series Spark library Flint (formerly HuoHua, Spark in Chinese): https://github.com/twosigma/flint

使用此库,对于2个给定的时间序列数据帧(文档说明了这些对象),您可以在PySpark(或Scala Spark)中执行:

Using this library, for 2 given Time Series DataFrames (the documentation explains these objects), you can perform in PySpark (or Scala Spark):

ddf_event = ...
ddf_gps = ...
result = ddf_event.leftJoin(ddf_gps, tolerance = "1day")

您的时间戳记不清楚,因此请根据您的需要设置容忍度. 如果需要,您还可以进行未来联接".

Your timestamps were not clear, so set tolerance according to your needs. You can also do 'future joins' if needed.

查看其Spark Summit演讲以获得更多说明和示例: https://youtu.be/g8o5-2lLcvQ

Check out their Spark Summit presentation for more explanation and examples: https://youtu.be/g8o5-2lLcvQ

这篇关于在最接近的关键条件下加入Spark DataFrames的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆