在python中按时加入两个火花数据帧(TimestampType) [英] Joining two spark dataframes on time (TimestampType) in python

查看:25
本文介绍了在python中按时加入两个火花数据帧(TimestampType)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框,我想基于一列加入它们,但需要注意的是,该列是一个时间戳,并且该时间戳必须在某个偏移量(5 秒)内才能加入记录.更具体地说,dates_df 中带有 date=1/3/2015:00:00:00 的记录应该与 events_df 连接起来>time=1/3/2015:00:00:01 因为两个时间戳彼此相差 5 秒以内.

I have two dataframes and I would like to join them based on one column, with a caveat that this column is a timestamp, and that timestamp has to be within a certain offset (5 seconds) in order to join records. More specifically, a record in dates_df with date=1/3/2015:00:00:00 should be joined with events_df with time=1/3/2015:00:00:01 because both timestamps are within 5 seconds from each other.

我试图让这个逻辑与 python spark 一起工作,这非常痛苦.人们如何在 Spark 中进行这样的联接?

I'm trying to get this logic working with python spark, and it is extremely painful. How do people do joins like this in spark?

我的方法是在 dates_df 中添加两个额外的列,这将确定 lower_timestampupper_timestamp 边界与 5 秒的偏移量,并执行条件连接.这就是它失败的地方,更具体地说:

My approach is to add two extra columns to dates_df that will determine the lower_timestamp and upper_timestamp bounds with a 5 second offset, and perform a conditional join. And this is where it fails, more specifically:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()

仅捕获查询的最后部分:

Captures only the last part of the query:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....

它给了我错误的结果.

我真的必须对每个不等式进行完整的笛卡尔连接,并在进行过程中删除重复项吗?

Do I really have to do a full blown cartesian join for each inequality, removing duplicates as I go along?

完整代码如下:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x - timedelta(seconds=offset)

def upper_range_func(x, offset=5):
    return x + timedelta(seconds=offset)


lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)

dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)

dates_df.show()

# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
           withColumn('upper_timestamp', upper_range(dates_df['date']))


event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)

events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)

events_df.show()

# finally, join the data
joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)    

joined_df.show()

我得到以下输出:

+-----+--------------------+
| name|                date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+

+--------------------+-------+
|                time|  event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+


+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name|                date|     lower_timestamp|     upper_timestamp|                time|  event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+

推荐答案

我使用 explain() 进行了 spark SQL 查询以查看它是如何完成的,并在 python 中复制了相同的行为.首先是如何对 SQL spark 执行相同的操作:

I did spark SQL query with explain() to see how it is done, and replicated the same behavior in python. First here is how to do the same with SQL spark:

dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and  events.time < dates.upper_timestamp")
results.explain()

这是可行的,但问题是如何在 python 中做到这一点,所以解决方案似乎只是一个简单的连接,然后是两个过滤器:

This works, but the question was about how to do it in python, so the solution seems to be just a plain join, followed by two filters:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)

joined_df.explain() 产生与 sql spark results.explain() 相同的查询,所以我假设事情就是这样完成的.

joined_df.explain() yields the same query as sql spark results.explain() so I assume this is how things are done.

这篇关于在python中按时加入两个火花数据帧(TimestampType)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆