如何在 PySpark 中创建 merge_asof 功能? [英] How do you create merge_asof functionality in PySpark?
问题描述
表 A
有许多列带有日期列,表 B
有一个日期时间和一个值.两个表中的数据都是零星生成的,没有固定的时间间隔.表A
很小,表B
很大.
Table A
has many columns with a date column, Table B
has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A
is small, table B
is massive.
我需要在A.datetime
的给定元素a
的条件下将B
加入到A
对应于
I need to join B
to A
under the condition that a given element a
of A.datetime
corresponds to
B[B['datetime'] <= a]]['datetime'].max()
有几种方法可以做到这一点,但我想要最有效的方法.
There are a couple ways to do this, but I would like the most efficient way.
将小数据集作为 Pandas DataFrame 进行广播.设置一个 Spark UDF,使用 merge_asof
为每一行创建一个 Pandas DataFrame 与大数据集合并.
Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof
.
使用 Spark SQL 的广播连接功能:在以下条件下设置 theta 连接
Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition
B['datetime'] <= A['datetime']
然后删除所有多余的行.
Then eliminate all the superfluous rows.
选项 B 看起来很糟糕……但是请告诉我第一种方法是否有效或者是否有其他方法.
Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.
这是示例输入和预期输出:
Here is the sample input and expected output:
A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
| A |2019-02-03|
| B |2019-03-14|
+---------+----------+
B =
+---------+----------+
| Key | Datetime |
+---------+----------+
| 0 |2019-01-01|
| 1 |2019-01-15|
| 2 |2019-02-01|
| 3 |2019-02-15|
| 4 |2019-03-01|
| 5 |2019-03-15|
+---------+----------+
custom_join(A,B) =
+---------+----------+
| Column1 | Key |
+---------+----------+
| A | 2 |
| B | 4 |
+---------+----------+
推荐答案
我怀疑它是否更快,但您可以通过使用 union
和 last
使用 Spark 解决它与 window
函数一起使用.
I doubt that it is faster, but you could solve it with Spark by using union
and last
together with a window
function.
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))
df3 = df1.unionByName(df2)
w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()
哪个给了
+-------+----------+---+
|Column1| Datetime|Key|
+-------+----------+---+
| A|2019-02-03| 2|
| B|2019-03-14| 4|
+-------+----------+---+
这是一个老问题,但可能对某些人仍然有用.
It's an old question but maybe still useful for somebody.
这篇关于如何在 PySpark 中创建 merge_asof 功能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!