在pyspark中计算日期范围的ID [英] count id for date range in pyspark

查看:60
本文介绍了在pyspark中计算日期范围的ID的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个pyspark数据框,其中包含parsed_date(dtype:date)和id(dtype:bigint)列,如下所示:

I have a pyspark dataframe with columns parsed_date (dtype: date) and id (dtype: bigint) as shown below:

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-19|
|1477469| 2017-12-21|
|1478190| 2017-12-21|
|1478570| 2017-12-19|
|1481415| 2017-12-21|
|1472592| 2017-12-20|
|1474023| 2017-12-22|
|1474029| 2017-12-22|
|1474067| 2017-12-24|
+-------+-----------+

我具有如下所示的功能.目的是传递日期(天)和t(天数).在df1中,id计入范围(day-t,day);在df2中,id计入范围(day-day,t + t).

I have a function as shown below. The aim is to pass a date (day) and t (no. of days). In df1 the id are counted in the range (day-t, day) and in df2 the id are counted in range (day, day+t).

def hypo_1(df, day, t):
    df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}' - interval 1 day")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between '{day}' + interval 1 day and '{day}' + interval {t} days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

df1, df2 = hypo_1(df, '2017-12-20', 2)
df1.show()
+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|           2|
|1472928| 2017-12-19|           3|
|1476917| 2017-12-19|           3|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2.show()
+-------+-----------+-----------+
|     id|parsed_date|count_after|
+-------+-----------+-----------+
|1481415| 2017-12-21|          3|
|1478190| 2017-12-21|          3|
|1477469| 2017-12-21|          3|
|1474023| 2017-12-22|          2|
|1474029| 2017-12-22|          2|
+-------+-----------+-----------+

我想知道如果范围内缺少日期,如何解决此代码?假设没有 2017-12-22 的记录?是否有可能有即时记录在案?我的意思是如果 2017-12-22 不存在,并且 2017-12-21 之后的下一个日期是 2017-12-24 ,那么有可能采取这种方式吗?

I am wondering how can this code be fixed if a date is missing within the range? let's say there is no record for 2017-12-22 ? Is it possible to have immediate days that are in the record? I mean if 2017-12-22 is not there and the next date after 2017-12-21 is 2017-12-24 so is it possible to take that somehow?

提供给 mck 的信用,以帮助创建功能 hypo_1(df,day,t)>.

credits to mck for helping in creating the function hypo_1(df, day, t).

推荐答案

我删除了 2017-12-22 行以进行说明.这个想法是要按日期顺序排列 dense_rank (降序为前,升序为后),并过滤等级为<== 2的行,即两个最接近的日期.

I removed the 2017-12-22 rows to illustrate. The idea is to get a dense_rank ordered by date (descending for before, ascending for after), and filter the rows with rank <= 2, i.e. the two closest dates.

from pyspark.sql import functions as F, Window

def hypo_1(df, day, t):
    df1 = (df.filter(f"parsed_date < '{day}'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy(F.desc('parsed_date'))))
             .filter('rn <= 2')
             .drop('rn')
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date > '{day}'")
             .withColumn('rn', F.dense_rank().over(Window.orderBy('parsed_date')))
             .filter('rn <= 2')
             .drop('rn')
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

df1, df2 = hypo_1(df, '2017-12-20', 2)
df1.show()
+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|           2|
|1472928| 2017-12-19|           3|
|1476917| 2017-12-19|           3|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2.show()
+-------+-----------+-----------+
|     id|parsed_date|count_after|
+-------+-----------+-----------+
|1477469| 2017-12-21|          3|
|1481415| 2017-12-21|          3|
|1478190| 2017-12-21|          3|
|1474067| 2017-12-24|          1|
+-------+-----------+-----------+

这篇关于在pyspark中计算日期范围的ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆