如何在 PySpark 中创建 merge_asof 功能? [英] How do you create merge_asof functionality in PySpark?

查看:22
本文介绍了如何在 PySpark 中创建 merge_asof 功能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

A 有许多列带有日期列,表 B 有一个日期时间和一个值.两个表中的数据都是零星生成的,没有固定的时间间隔.表A很小,表B很大.

Table A has many columns with a date column, Table B has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A is small, table B is massive.

我需要在A.datetime的给定元素a的条件下将B加入到A对应于

I need to join B to A under the condition that a given element a of A.datetime corresponds to

B[B['datetime'] <= a]]['datetime'].max()

有几种方法可以做到这一点,但我想要最有效的方法.

There are a couple ways to do this, but I would like the most efficient way.

将小数据集作为 Pandas DataFrame 进行广播.设置一个 Spark UDF,使用 merge_asof 为每一行创建一个 Pandas DataFrame 与大数据集合并.

Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof.

使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接

Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition

B['datetime'] <= A['datetime']

然后删除所有多余的行.

Then eliminate all the superfluous rows.

选项 B 看起来很糟糕……但是请告诉我第一种方法是否有效或者是否有其他方法.

Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.

这是示例输入和预期输出:

Here is the sample input and expected output:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2    |2019-02-01|
|    3    |2019-02-15|
|    4    |2019-03-01|
|    5    |2019-03-15|
+---------+----------+

custom_join(A,B) =
+---------+----------+
| Column1 |   Key    |
+---------+----------+
|    A    |     2    |
|    B    |     4    |
+---------+----------+

推荐答案

我怀疑它是否更快,但您可以通过使用 unionlast 使用 Spark 解决它与 window 函数一起使用.

I doubt that it is faster, but you could solve it with Spark by using union and last together with a window function.

from pyspark.sql import functions as f
from pyspark.sql.window import Window

df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))

df3 = df1.unionByName(df2)

w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()

哪个给了

+-------+----------+---+
|Column1|  Datetime|Key|
+-------+----------+---+
|      A|2019-02-03|  2|
|      B|2019-03-14|  4|
+-------+----------+---+

这是一个老问题,但可能对某些人仍然有用.

It's an old question but maybe still useful for somebody.

这篇关于如何在 PySpark 中创建 merge_asof 功能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆