在python中按时联接两个Spark数据帧(TimestampType) [英] Joining two spark dataframes on time (TimestampType) in python

查看:66
本文介绍了在python中按时联接两个Spark数据帧(TimestampType)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个数据框,我想基于一个列将它们连接起来,但需要注意的是,此列是一个时间戳,并且该时间戳必须在一定的偏移量(5秒)内才能连接记录.更具体地说,在dates_df中具有date=1/3/2015:00:00:00的记录应该与events_df中具有time=1/3/2015:00:00:01的记录联接在一起,因为两个时间戳之间的时间间隔都在5秒之内.

I have two dataframes and I would like to join them based on one column, with a caveat that this column is a timestamp, and that timestamp has to be within a certain offset (5 seconds) in order to join records. More specifically, a record in dates_df with date=1/3/2015:00:00:00 should be joined with events_df with time=1/3/2015:00:00:01 because both timestamps are within 5 seconds from each other.

我正在尝试使这种逻辑与python spark一起使用,这非常痛苦.人们如何在Spark中加入这样的参与者?

I'm trying to get this logic working with python spark, and it is extremely painful. How do people do joins like this in spark?

我的方法是在dates_df中添加两个额外的列,这些列将确定具有5秒偏移量的lower_timestampupper_timestamp边界,并执行条件连接.这就是失败的地方,更具体地说:

My approach is to add two extra columns to dates_df that will determine the lower_timestamp and upper_timestamp bounds with a 5 second offset, and perform a conditional join. And this is where it fails, more specifically:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()

仅捕获查询的最后一部分:

Captures only the last part of the query:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....

这给我一个错误的结果.

and it gives me a wrong result.

我真的必须对每个不等式进行完全笛卡尔连接,并在进行过程中删除重复项吗?

Do I really have to do a full blown cartesian join for each inequality, removing duplicates as I go along?

这是完整的代码:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x - timedelta(seconds=offset)

def upper_range_func(x, offset=5):
    return x + timedelta(seconds=offset)


lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

dates_fields = [StructField("name", StringType(), True), StructField("date", TimestampType(), True)]
dates_schema = StructType(dates_fields)

dates = [('day_%s' % x, datetime(year=2015, day=x, month=1)) for x in range(1,5)]
dates_df = sqlContext.createDataFrame(dates, dates_schema)

dates_df.show()

# extend dates_df with time ranges
dates_df = dates_df.withColumn('lower_timestamp', lower_range(dates_df['date'])).\
           withColumn('upper_timestamp', upper_range(dates_df['date']))


event_fields = [StructField("time", TimestampType(), True), StructField("event", StringType(), True)]
event_schema = StructType(event_fields)

events = [(datetime(year=2015, day=3, month=1, second=3), 'meeting')]
events_df = sqlContext.createDataFrame(events, event_schema)

events_df.show()

# finally, join the data
joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)    

joined_df.show()

我得到以下输出:

+-----+--------------------+
| name|                date|
+-----+--------------------+
|day_1|2015-01-01 00:00:...|
|day_2|2015-01-02 00:00:...|
|day_3|2015-01-03 00:00:...|
|day_4|2015-01-04 00:00:...|
+-----+--------------------+

+--------------------+-------+
|                time|  event|
+--------------------+-------+
|2015-01-03 00:00:...|meeting|
+--------------------+-------+


+-----+--------------------+--------------------+--------------------+--------------------+-------+
| name|                date|     lower_timestamp|     upper_timestamp|                time|  event|
+-----+--------------------+--------------------+--------------------+--------------------+-------+
|day_3|2015-01-03 00:00:...|2015-01-02 23:59:...|2015-01-03 00:00:...|2015-01-03 00:00:...|meeting|
|day_4|2015-01-04 00:00:...|2015-01-03 23:59:...|2015-01-04 00:00:...|2015-01-03 00:00:...|meeting|
+-----+--------------------+--------------------+--------------------+--------------------+-------+

推荐答案

我确实使用explain()触发了SQL查询以了解其完成方式,并在python中复制了相同的行为.首先,这里是如何使用SQL Spark进行操作:

I did spark SQL query with explain() to see how it is done, and replicated the same behavior in python. First here is how to do the same with SQL spark:

dates_df.registerTempTable("dates")
events_df.registerTempTable("events")
results = sqlContext.sql("SELECT * FROM dates INNER JOIN events ON dates.lower_timestamp < events.time and  events.time < dates.upper_timestamp")
results.explain()

这可行,但是问题是关于如何在python中进行操作,因此解决方案似乎只是一个普通的联接,然后是两个过滤器:

This works, but the question was about how to do it in python, so the solution seems to be just a plain join, followed by two filters:

joined_df = dates_df.join(events_df).filter(dates_df.lower_timestamp < events_df.time).filter(events_df.time < dates_df.upper_timestamp)

joined_df.explain()产生与sql spark results.explain()相同的查询,所以我假设这是完成的过程.

joined_df.explain() yields the same query as sql spark results.explain() so I assume this is how things are done.

这篇关于在python中按时联接两个Spark数据帧(TimestampType)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆