在分组时用 Null\0 填充所有列上缺失信息的最佳方法是什么? [英] What is the best way to fill missing info on all columns with Null\0 for missing records in Spark dataframe while groupby?

查看:20
本文介绍了在分组时用 Null\0 填充所有列上缺失信息的最佳方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有以下 Spark 框架:

Let's say I have the following Spark frame:

+--------+----------+-----------+-------------------+-------------------+
|UserName|date      |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B       |2021-08-11|2          |2                  |0                  |
|A       |2021-08-11|3          |2                  |1                  |
|B       |2021-08-13|1          |1                  |0                  |
+--------+----------+-----------+-------------------+-------------------+

现在我不仅想用正确的日期来估算 date 列中缺失的日期,以便数据帧保持其连续的时间序列性质和同等顺序的帧,而且还要用 估算其他列Null0(最好是 groupBy).

Now I want to not only impute the missing dates in date column with the right dates so that dataframe keeps its continuous time-series nature and equally sequenced frame but also impute other columns with Null or 0 (while groupBy preferably).

我的代码如下:

import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType

dict2  = [("2021-08-11 04:05:06", "A"),
         ("2021-08-11 04:15:06", "B"),
         ("2021-08-11 09:15:26", "A"),
         ("2021-08-11 11:04:06", "B"),
         ("2021-08-11 14:55:16", "A"),
         ("2021-08-13 04:12:11", "B"),

  ]

schema = StructType([ 

    StructField("timestamp",        StringType(),    True), \
    StructField("UserName",         StringType(),    True), \
  ])
 
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict2,schema=schema)
#sdf.printSchema()
#sdf.show(truncate=False)
#+-------------------+--------+
#|timestamp          |UserName|
#+-------------------+--------+
#|2021-08-11 04:05:06|A       |
#|2021-08-11 04:15:06|B       |
#|2021-08-11 09:15:26|A       |
#|2021-08-11 11:04:06|B       |
#|2021-08-11 14:55:16|A       |
#|2021-08-13 04:12:11|B       |
#+-------------------+--------+

#Generate date and timestamp
sdf1 = sdf.withColumn('timestamp',    F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
            .withColumn('date',         F.to_date("timestamp",      "yyyy-MM-dd").cast(DateType())) \
            .select('timestamp', 'date', 'UserName') 

#sdf1.show(truncate = False)

#+-------------------+----------+--------+
#|timestamp          |date      |UserName|
#+-------------------+----------+--------+
#|2021-08-11 04:05:06|2021-08-11|A       |
#|2021-08-11 04:15:06|2021-08-11|B       |
#|2021-08-11 09:15:26|2021-08-11|A       |
#|2021-08-11 11:04:06|2021-08-11|B       |
#|2021-08-11 14:55:16|2021-08-11|A       |
#|2021-08-13 04:12:11|2021-08-13|B       |
#+-------------------+----------+--------+

#Aggeragate records numbers for specific features (Username) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = sdf1.groupBy("UserName", "date").agg(
    F.sum(F.hour("timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
    F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
    F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),

).sort('date')

df.show(truncate = False)

问题是当我在 dateUserName 上 groupBy 时,我错过了一些用户 B 有活动但用户 A 的日期 不是,反之亦然.因此,我有兴趣通过重新填充这些日期(无需时间戳)并将 0 分配给这些列来反映 Spark 数据框中的这些无活动.我不确定我是否可以在分组时或之前或之后执行此操作!

The problem is when I groupBy on date and UserName, I missed some dates which user B had activities but user A not or vice versa. So I'm interested in reflecting these no activities in the Spark dataframe by refilling those dates (no need to timestamp) and allocating 0 to those columns. I'm not sure if I can do this while grouping or before or after!

我已经检查了一些相关的帖子以及PySpark 提供窗口函数并启发了这个answer 所以直到现在我都试过这个:

I already checked some related post as well as PySpark offers window functions and inspired this answer so until now I've tried this:

# compute the list of all dates from available dates
max_date = sdf1.select(F.max('date')).first()['max(date)']
min_date = sdf1.select(F.min('date')).first()['min(date)']
print(min_date) #2021-08-11
print(max_date) #2021-08-13

#compute list of available dates based on min_date & max_date from available data
dates_list = [max_date - dt.timedelta(days=x) for x in range((max_date - min_date).days +1)]
print(dates_list)

#create a temporaray Spark dataframe for date column includng missing dates with interval 1 day
sqlCtx = SQLContext(sc)
df2 = sqlCtx.createDataFrame(data=dates_list)

#Apply leftouter join on date column
dff = df2.join(sdf1, ["date"], "leftouter")
#dff.sort('date').show(truncate = False)

#possible to use .withColumn().otherwise()
#.withColumn('date',when(col('date').isNull(),to_date(lit('01.01.1900'),'dd.MM.yyyy')).otherwise(col('date')))

#Replace 0 for null for all integer columns
dfff = dff.na.fill(value=0).sort('date')
         
dfff.select('date','Username', 'NoLogPerDay','NoLogPer-1st-12-hrs','NoLogPer-2nd-12-hrs').sort('date').show(truncate = False)

请注意,我对使用 UDF 或通过 toPandas()

Please note that I'm not interested in using UDF or hacking it via toPandas()

所以在 groupBy 之后的预期结果应该如下所示:

so expected results should be like below after groupBy:

+--------+----------+-----------+-------------------+-------------------+
|UserName|date      |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B       |2021-08-11|2          |2                  |0                  |
|A       |2021-08-11|3          |2                  |1                  | 
|B       |2021-08-12|0          |0                  |0                  | <--
|A       |2021-08-12|0          |0                  |0                  | <--
|B       |2021-08-13|1          |1                  |0                  |
|A       |2021-08-13|0          |0                  |0                  | <--
+--------+----------+-----------+-------------------+-------------------+

推荐答案

这是一种方法:

首先,生成新的数据帧 all_dates_df,其中包含分组 df 中从最小到最大日期的日期序列.为此,您可以使用 sequence功能:

First, generate new dataframe all_dates_df that contains the sequence of the dates from min to max date in your grouped df. For this you can use sequence function:

import pyspark.sql.functions as F

all_dates_df = df.selectExpr(
    "sequence(min(date), max(date), interval 1 day) as date"
).select(F.explode("date").alias("date"))

all_dates_df.show()
#+----------+
#|      date|
#+----------+
#|2021-08-11|
#|2021-08-12|
#|2021-08-13|
#+----------+

现在,您需要使用具有不同 UserName 数据框的交叉连接为所有用户复制每个日期,最后与分组的 df 连接以获得所需的输出:

Now, you need to duplicate each date for all the users using a cross join with distinct UserName dataframe and finally join with the grouped df to get the desired output:

result_df = all_dates_df.crossJoin(
    df.select("UserName").distinct()
).join(
    df, 
    ["UserName", "date"],
    "left"
).fillna(0)

result_df.show()
#+--------+----------+-----------+-------------------+-------------------+
#|UserName|      date|NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
#+--------+----------+-----------+-------------------+-------------------+
#|       A|2021-08-11|          3|                  2|                  1|
#|       B|2021-08-11|          2|                  2|                  0|
#|       A|2021-08-12|          0|                  0|                  0|
#|       B|2021-08-12|          0|                  0|                  0|
#|       B|2021-08-13|          1|                  1|                  0|
#|       A|2021-08-13|          0|                  0|                  0|
#+--------+----------+-----------+-------------------+-------------------+

这篇关于在分组时用 Null\0 填充所有列上缺失信息的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆