PySpark 和时间序列数据:如何巧妙地避免日期重叠? [英] PySpark and time series data: how to smartly avoid overlapping dates?

查看:28
本文介绍了PySpark 和时间序列数据:如何巧妙地避免日期重叠?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下示例 Spark 数据帧

I have the following sample Spark dataframe

import pandas as pd
import pyspark
import pyspark.sql.functions as fn
from pyspark.sql.window import Window

raw_df = pd.DataFrame([
    (1115, dt.datetime(2019,8,5,18,20), dt.datetime(2019,8,5,18,40)),
    (484, dt.datetime(2019,8,5,18,30), dt.datetime(2019,8,9,18,40)),
    (484, dt.datetime(2019,8,4,18,30), dt.datetime(2019,8,6,18,40)),
    (484, dt.datetime(2019,8,2,18,30), dt.datetime(2019,8,3,18,40)),
    (484, dt.datetime(2019,8,7,18,50), dt.datetime(2019,8,9,18,50)),
    (1115, dt.datetime(2019,8,6,18,20), dt.datetime(2019,8,6,18,40)),
], columns=['server_id', 'start_time', 'end_time'])
df = spark.createDataFrame(raw_df)

导致

+---------+-------------------+-------------------+
|server_id|         start_time|           end_time|
+---------+-------------------+-------------------+
|     1115|2019-08-05 18:20:00|2019-08-05 18:40:00|
|      484|2019-08-05 18:30:00|2019-08-09 18:40:00|
|      484|2019-08-04 18:30:00|2019-08-06 18:40:00|
|      484|2019-08-02 18:30:00|2019-08-03 18:40:00|
|      484|2019-08-07 18:50:00|2019-08-09 18:50:00|
|     1115|2019-08-06 18:20:00|2019-08-06 18:40:00|
+---------+-------------------+-------------------+

这表示每个服务器的使用日期范围.我想将其转换为非重叠日期的时间序列.

This indicates the usage date ranges of each server. I want to convert this into a time series of non-overlapping dates.

我想在不使用 UDF 的情况下实现这一点.

这就是我现在正在做的,这是错误的

This is what I'm doing now, which is wrong

w = Window().orderBy(fn.lit('A'))
# Separate start/end date of usage into rows
df = (df.withColumn('start_end_time', fn.array('start_time', 'end_time'))
    .withColumn('event_dt', fn.explode('start_end_time'))
    .withColumn('row_num', fn.row_number().over(w)))
# Indicate start/end date of the usage (start date will always be on odd rows)
df = (df.withColumn('is_start', fn.when(fn.col('row_num')%2 == 0, 0).otherwise(1))
    .select('server_id', 'event_dt', 'is_start'))

给出

+---------+-------------------+--------+
|server_id|           event_dt|is_start|
+---------+-------------------+--------+
|     1115|2019-08-05 18:20:00|       1|
|     1115|2019-08-05 18:40:00|       0|
|      484|2019-08-05 18:30:00|       1|
|      484|2019-08-09 18:40:00|       0|
|      484|2019-08-04 18:30:00|       1|
|      484|2019-08-06 18:40:00|       0|
|      484|2019-08-02 18:30:00|       1|
|      484|2019-08-03 18:40:00|       0|
|      484|2019-08-07 18:50:00|       1|
|      484|2019-08-09 18:50:00|       0|
|     1115|2019-08-06 18:20:00|       1|
|     1115|2019-08-06 18:40:00|       0|
+---------+-------------------+--------+

但是我想要达到的最终结果如下:

+---------+-------------------+--------+
|server_id|           event_dt|is_start|
+---------+-------------------+--------+
|     1115|2019-08-05 18:20:00|       1|
|     1115|2019-08-05 18:40:00|       0|
|     1115|2019-08-06 18:20:00|       1|
|     1115|2019-08-06 18:40:00|       0|
|      484|2019-08-02 18:30:00|       1|
|      484|2019-08-03 18:40:00|       0|
|      484|2019-08-04 18:30:00|       1|
|      484|2019-08-09 18:50:00|       0|
+---------+-------------------+--------+

所以对于 server_id 484 我有实际的开始和结束日期,中间没有任何噪音.

So for server_id 484 I have the actual start and end dates without all the noise in between.

对于如何在不使用 UDF 的情况下实现这一目标,您有什么建议吗?

Do you have any suggestion on how to achieve that without using UDFs?

谢谢

推荐答案

IIUC,这是使用Windowlag()sum()<可以解决的问题之一/strong> 函数为符合某些特定条件的有序连续行添加子组标签.类似于我们在 Pandas 中使用 shift()+cumsum() 所做的.

IIUC, this is one of the problems which can be resolved by using Window lag(), sum() function to add a sub-group label for ordered consecutive rows which match some specific conditions. Similar to what we do in Pandas using shift()+cumsum().

  1. 设置窗口规范w1:

w1 = Window.partitionBy('server_id').orderBy('start_time')

并计算以下内容:

  • ma​​x('end_time'):window-w1
  • 上当前行之前的最大end_time
  • 滞后('end_time'):前一个end_time
  • sum('prev_end_time < current_start_time ? 1 : 0'):标识子组的标志
  • max('end_time'): the max end_time before the current row over window-w1
  • lag('end_time'): the previous end_time
  • sum('prev_end_time < current_start_time ? 1 : 0'): the flag to identify the sub-group

以上三项可以分别对应Pandas的cummax()shift()cumsum().

The above three items can be corresponding to Pandas cummax(), shift() and cumsum().

通过用 max(end_time).over(w1) 更新 df.end_time 并设置子来计算 df1分组标签g,然后通过groupby(server_id, g)计算min(start_time)max(end_time)代码>

Calculate df1 by updating df.end_time with max(end_time).over(w1) and setting up the sub-group label g, then doing groupby(server_id, g) to calculate the min(start_time) and max(end_time)

df1 = df.withColumn('end_time', fn.max('end_time').over(w1)) \
        .withColumn('g', fn.sum(fn.when(fn.lag('end_time').over(w1) < fn.col('start_time'),1).otherwise(0)).over(w1)) \
        .groupby('server_id', 'g') \
        .agg(fn.min('start_time').alias('start_time'), fn.max('end_time').alias('end_time'))

df1.show()
+---------+---+-------------------+-------------------+
|server_id|  g|         start_time|           end_time|
+---------+---+-------------------+-------------------+
|     1115|  0|2019-08-05 18:20:00|2019-08-05 18:40:00|
|     1115|  1|2019-08-06 18:20:00|2019-08-06 18:40:00|
|      484|  0|2019-08-02 18:30:00|2019-08-03 18:40:00|
|      484|  1|2019-08-04 18:30:00|2019-08-09 18:50:00|
+---------+---+-------------------+-------------------+

  • 有了 df1 后,我们可以使用两个选择拆分数据,然后合并结果集:

  • After we have df1, we can split the data using two selects and then union the resultset:

    df_new = df1.selectExpr('server_id', 'start_time as event_dt', '1 as is_start').union(
             df1.selectExpr('server_id', 'end_time as event_dt', '0 as is_start')
    )        
    
    df_new.orderBy('server_id', 'event_dt').show()                                                                            
    +---------+-------------------+--------+
    |server_id|           event_dt|is_start|
    +---------+-------------------+--------+
    |      484|2019-08-02 18:30:00|       1|
    |      484|2019-08-03 18:40:00|       0|
    |      484|2019-08-04 18:30:00|       1|
    |      484|2019-08-09 18:50:00|       0|
    |     1115|2019-08-05 18:20:00|       1|
    |     1115|2019-08-05 18:40:00|       0|
    |     1115|2019-08-06 18:20:00|       1|
    |     1115|2019-08-06 18:40:00|       0|
    +---------+-------------------+--------+
    

  • 这篇关于PySpark 和时间序列数据:如何巧妙地避免日期重叠?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆