在分组时用 Null\0 填充所有列上缺失信息的最佳方法是什么? [英] What is the best way to fill missing info on all columns with Null\0 for missing records in Spark dataframe while groupby?
问题描述
假设我有以下 Spark 框架:
Let's say I have the following Spark frame:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-13|1 |1 |0 |
+--------+----------+-----------+-------------------+-------------------+
现在我不仅想用正确的日期来估算 date
列中缺失的日期,以便数据帧保持其连续的时间序列性质和同等顺序的帧,而且还要用 估算其他列Null
或 0
(最好是 groupBy).
Now I want to not only impute the missing dates in date
column with the right dates so that dataframe keeps its continuous time-series nature and equally sequenced frame but also impute other columns with Null
or 0
(while groupBy preferably).
我的代码如下:
import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict2 = [("2021-08-11 04:05:06", "A"),
("2021-08-11 04:15:06", "B"),
("2021-08-11 09:15:26", "A"),
("2021-08-11 11:04:06", "B"),
("2021-08-11 14:55:16", "A"),
("2021-08-13 04:12:11", "B"),
]
schema = StructType([
StructField("timestamp", StringType(), True), \
StructField("UserName", StringType(), True), \
])
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict2,schema=schema)
#sdf.printSchema()
#sdf.show(truncate=False)
#+-------------------+--------+
#|timestamp |UserName|
#+-------------------+--------+
#|2021-08-11 04:05:06|A |
#|2021-08-11 04:15:06|B |
#|2021-08-11 09:15:26|A |
#|2021-08-11 11:04:06|B |
#|2021-08-11 14:55:16|A |
#|2021-08-13 04:12:11|B |
#+-------------------+--------+
#Generate date and timestamp
sdf1 = sdf.withColumn('timestamp', F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
.withColumn('date', F.to_date("timestamp", "yyyy-MM-dd").cast(DateType())) \
.select('timestamp', 'date', 'UserName')
#sdf1.show(truncate = False)
#+-------------------+----------+--------+
#|timestamp |date |UserName|
#+-------------------+----------+--------+
#|2021-08-11 04:05:06|2021-08-11|A |
#|2021-08-11 04:15:06|2021-08-11|B |
#|2021-08-11 09:15:26|2021-08-11|A |
#|2021-08-11 11:04:06|2021-08-11|B |
#|2021-08-11 14:55:16|2021-08-11|A |
#|2021-08-13 04:12:11|2021-08-13|B |
#+-------------------+----------+--------+
#Aggeragate records numbers for specific features (Username) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = sdf1.groupBy("UserName", "date").agg(
F.sum(F.hour("timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),
).sort('date')
df.show(truncate = False)
问题是当我在 date
和 UserName
上 groupBy 时,我错过了一些用户 B
有活动但用户 A 的日期
不是,反之亦然.因此,我有兴趣通过重新填充这些日期(无需时间戳)并将 0
分配给这些列来反映 Spark 数据框中的这些无活动.我不确定我是否可以在分组时或之前或之后执行此操作!
The problem is when I groupBy on date
and UserName
, I missed some dates which user B
had activities but user A
not or vice versa. So I'm interested in reflecting these no activities in the Spark dataframe by refilling those dates (no need to timestamp) and allocating 0
to those columns. I'm not sure if I can do this while grouping or before or after!
我已经检查了一些相关的帖子以及
I already checked some related post as well as PySpark offers window functions and inspired this answer so until now I've tried this:
# compute the list of all dates from available dates
max_date = sdf1.select(F.max('date')).first()['max(date)']
min_date = sdf1.select(F.min('date')).first()['min(date)']
print(min_date) #2021-08-11
print(max_date) #2021-08-13
#compute list of available dates based on min_date & max_date from available data
dates_list = [max_date - dt.timedelta(days=x) for x in range((max_date - min_date).days +1)]
print(dates_list)
#create a temporaray Spark dataframe for date column includng missing dates with interval 1 day
sqlCtx = SQLContext(sc)
df2 = sqlCtx.createDataFrame(data=dates_list)
#Apply leftouter join on date column
dff = df2.join(sdf1, ["date"], "leftouter")
#dff.sort('date').show(truncate = False)
#possible to use .withColumn().otherwise()
#.withColumn('date',when(col('date').isNull(),to_date(lit('01.01.1900'),'dd.MM.yyyy')).otherwise(col('date')))
#Replace 0 for null for all integer columns
dfff = dff.na.fill(value=0).sort('date')
dfff.select('date','Username', 'NoLogPerDay','NoLogPer-1st-12-hrs','NoLogPer-2nd-12-hrs').sort('date').show(truncate = False)
请注意,我对使用 UDF
或通过 toPandas()
Please note that I'm not interested in using UDF
or hacking it via toPandas()
所以在 groupBy 之后的预期结果应该如下所示:
so expected results should be like below after groupBy:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-12|0 |0 |0 | <--
|A |2021-08-12|0 |0 |0 | <--
|B |2021-08-13|1 |1 |0 |
|A |2021-08-13|0 |0 |0 | <--
+--------+----------+-----------+-------------------+-------------------+
推荐答案
这是一种方法:
首先,生成新的数据帧 all_dates_df
,其中包含分组 df
中从最小到最大日期的日期序列.为此,您可以使用 sequence
功能:
First, generate new dataframe all_dates_df
that contains the sequence of the dates from min to max date in your grouped df
. For this you can use sequence
function:
import pyspark.sql.functions as F
all_dates_df = df.selectExpr(
"sequence(min(date), max(date), interval 1 day) as date"
).select(F.explode("date").alias("date"))
all_dates_df.show()
#+----------+
#| date|
#+----------+
#|2021-08-11|
#|2021-08-12|
#|2021-08-13|
#+----------+
现在,您需要使用具有不同 UserName
数据框的交叉连接为所有用户复制每个日期,最后与分组的 df
连接以获得所需的输出:
Now, you need to duplicate each date for all the users using a cross join with distinct UserName
dataframe and finally join with the grouped df
to get the desired output:
result_df = all_dates_df.crossJoin(
df.select("UserName").distinct()
).join(
df,
["UserName", "date"],
"left"
).fillna(0)
result_df.show()
#+--------+----------+-----------+-------------------+-------------------+
#|UserName| date|NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
#+--------+----------+-----------+-------------------+-------------------+
#| A|2021-08-11| 3| 2| 1|
#| B|2021-08-11| 2| 2| 0|
#| A|2021-08-12| 0| 0| 0|
#| B|2021-08-12| 0| 0| 0|
#| B|2021-08-13| 1| 1| 0|
#| A|2021-08-13| 0| 0| 0|
#+--------+----------+-----------+-------------------+-------------------+
这篇关于在分组时用 Null\0 填充所有列上缺失信息的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!