PySpark和时间序列数据:如何巧妙地避免重叠的日期? [英] PySpark and time series data: how to smartly avoid overlapping dates?

查看:55
本文介绍了PySpark和时间序列数据:如何巧妙地避免重叠的日期?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下示例Spark数据帧

I have the following sample Spark dataframe

import pandas as pd
import pyspark
import pyspark.sql.functions as fn
from pyspark.sql.window import Window

raw_df = pd.DataFrame([
    (1115, dt.datetime(2019,8,5,18,20), dt.datetime(2019,8,5,18,40)),
    (484, dt.datetime(2019,8,5,18,30), dt.datetime(2019,8,9,18,40)),
    (484, dt.datetime(2019,8,4,18,30), dt.datetime(2019,8,6,18,40)),
    (484, dt.datetime(2019,8,2,18,30), dt.datetime(2019,8,3,18,40)),
    (484, dt.datetime(2019,8,7,18,50), dt.datetime(2019,8,9,18,50)),
    (1115, dt.datetime(2019,8,6,18,20), dt.datetime(2019,8,6,18,40)),
], columns=['server_id', 'start_time', 'end_time'])
df = spark.createDataFrame(raw_df)

导致

+---------+-------------------+-------------------+
|server_id|         start_time|           end_time|
+---------+-------------------+-------------------+
|     1115|2019-08-05 18:20:00|2019-08-05 18:40:00|
|      484|2019-08-05 18:30:00|2019-08-09 18:40:00|
|      484|2019-08-04 18:30:00|2019-08-06 18:40:00|
|      484|2019-08-02 18:30:00|2019-08-03 18:40:00|
|      484|2019-08-07 18:50:00|2019-08-09 18:50:00|
|     1115|2019-08-06 18:20:00|2019-08-06 18:40:00|
+---------+-------------------+-------------------+

这指示每个服务器的使用日期范围.我想将其转换为非重叠日期的时间序列.

This indicates the usage date ranges of each server. I want to convert this into a time series of non-overlapping dates.

我想在不使用UDF的情况下实现这一目标.

I would like to achieve this without using UDFs.

这就是我现在正在做的,这是错误的

This is what I'm doing now, which is wrong

w = Window().orderBy(fn.lit('A'))
# Separate start/end date of usage into rows
df = (df.withColumn('start_end_time', fn.array('start_time', 'end_time'))
    .withColumn('event_dt', fn.explode('start_end_time'))
    .withColumn('row_num', fn.row_number().over(w)))
# Indicate start/end date of the usage (start date will always be on odd rows)
df = (df.withColumn('is_start', fn.when(fn.col('row_num')%2 == 0, 0).otherwise(1))
    .select('server_id', 'event_dt', 'is_start'))

给出

+---------+-------------------+--------+
|server_id|           event_dt|is_start|
+---------+-------------------+--------+
|     1115|2019-08-05 18:20:00|       1|
|     1115|2019-08-05 18:40:00|       0|
|      484|2019-08-05 18:30:00|       1|
|      484|2019-08-09 18:40:00|       0|
|      484|2019-08-04 18:30:00|       1|
|      484|2019-08-06 18:40:00|       0|
|      484|2019-08-02 18:30:00|       1|
|      484|2019-08-03 18:40:00|       0|
|      484|2019-08-07 18:50:00|       1|
|      484|2019-08-09 18:50:00|       0|
|     1115|2019-08-06 18:20:00|       1|
|     1115|2019-08-06 18:40:00|       0|
+---------+-------------------+--------+

但是我想要达到的最终结果如下:

+---------+-------------------+--------+
|server_id|           event_dt|is_start|
+---------+-------------------+--------+
|     1115|2019-08-05 18:20:00|       1|
|     1115|2019-08-05 18:40:00|       0|
|     1115|2019-08-06 18:20:00|       1|
|     1115|2019-08-06 18:40:00|       0|
|      484|2019-08-02 18:30:00|       1|
|      484|2019-08-03 18:40:00|       0|
|      484|2019-08-04 18:30:00|       1|
|      484|2019-08-09 18:50:00|       0|
+---------+-------------------+--------+

因此,对于 server_id 484,我具有实际的开始日期和结束日期,而中间没有任何干扰.

So for server_id 484 I have the actual start and end dates without all the noise in between.

您是否对不使用UDF的情况有任何建议?

Do you have any suggestion on how to achieve that without using UDFs?

谢谢

推荐答案

IIUC,这是可以通过使用Window lag() sum()函数为符合某些特定条件的有序连续行添加子组标签.类似于我们使用 shift() + cumsum()在熊猫中进行的操作.

IIUC, this is one of the problems which can be resolved by using Window lag(), sum() function to add a sub-group label for ordered consecutive rows which match some specific conditions. Similar to what we do in Pandas using shift()+cumsum().

  1. 设置窗口规范 w1 :

w1 = Window.partitionBy('server_id').orderBy('start_time')

并计算以下内容:

  • max ('end_time'):窗口- w1
  • 上当前行之前的最大 end_time
  • 滞后(结束时间"):上一个 end_time
  • 总和('prev_end_time< current_start_time?1:0'):标识子组的标志
  • max('end_time'): the max end_time before the current row over window-w1
  • lag('end_time'): the previous end_time
  • sum('prev_end_time < current_start_time ? 1 : 0'): the flag to identify the sub-group

以上三个项目可以对应于熊猫 cummax() shift() cumsum().

The above three items can be corresponding to Pandas cummax(), shift() and cumsum().

通过使用 max(end_time).over(w1)更新 df.end_time 并设置子代码来计算 df1 组标签 g ,然后执行 groupby(server_id,g)来计算 min(start_time) max(end_time)

Calculate df1 by updating df.end_time with max(end_time).over(w1) and setting up the sub-group label g, then doing groupby(server_id, g) to calculate the min(start_time) and max(end_time)

df1 = df.withColumn('end_time', fn.max('end_time').over(w1)) \
        .withColumn('g', fn.sum(fn.when(fn.lag('end_time').over(w1) < fn.col('start_time'),1).otherwise(0)).over(w1)) \
        .groupby('server_id', 'g') \
        .agg(fn.min('start_time').alias('start_time'), fn.max('end_time').alias('end_time'))

df1.show()
+---------+---+-------------------+-------------------+
|server_id|  g|         start_time|           end_time|
+---------+---+-------------------+-------------------+
|     1115|  0|2019-08-05 18:20:00|2019-08-05 18:40:00|
|     1115|  1|2019-08-06 18:20:00|2019-08-06 18:40:00|
|      484|  0|2019-08-02 18:30:00|2019-08-03 18:40:00|
|      484|  1|2019-08-04 18:30:00|2019-08-09 18:50:00|
+---------+---+-------------------+-------------------+

  • 有了 df1 之后,我们可以使用两个选择拆分数据,然后合并结果集:

  • After we have df1, we can split the data using two selects and then union the resultset:

    df_new = df1.selectExpr('server_id', 'start_time as event_dt', '1 as is_start').union(
             df1.selectExpr('server_id', 'end_time as event_dt', '0 as is_start')
    )        
    
    df_new.orderBy('server_id', 'event_dt').show()                                                                            
    +---------+-------------------+--------+
    |server_id|           event_dt|is_start|
    +---------+-------------------+--------+
    |      484|2019-08-02 18:30:00|       1|
    |      484|2019-08-03 18:40:00|       0|
    |      484|2019-08-04 18:30:00|       1|
    |      484|2019-08-09 18:50:00|       0|
    |     1115|2019-08-05 18:20:00|       1|
    |     1115|2019-08-05 18:40:00|       0|
    |     1115|2019-08-06 18:20:00|       1|
    |     1115|2019-08-06 18:40:00|       0|
    +---------+-------------------+--------+
    

  • 这篇关于PySpark和时间序列数据:如何巧妙地避免重叠的日期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆