用零填充缺失的销售值,并在PySpark中计算3个月的平均值 [英] Filling Missing sales value with zero and calculate 3 month average in PySpark
问题描述
我要添加零销售额的缺失值,并在pyspark中计算3个月的平均值
I Want add missing values with zero sales and calculate 3 month average in pyspark
My Input :
product specialty date sales
A pharma 1/3/2019 50
A pharma 1/4/2019 60
A pharma 1/5/2019 70
A pharma 1/8/2019 80
A ENT 1/8/2019 50
A ENT 1/9/2019 65
A ENT 1/11/2019 40
my output:
product specialty date sales 3month_avg_sales
A pharma 1/3/2019 50 16.67
A pharma 1/4/2019 60 36.67
A pharma 1/5/2019 70 60
A pharma 1/6/2019 0 43.33
A pharma 1/7/2019 0 23.33
A pharma 1/8/2019 80 26.67
A ENT 1/8/2019 50 16.67
A ENT 1/9/2019 65 38.33
A ENT 1/10/2019 0 38.33
A ENT 1/11/2019 40 35
row = Row("Product", "specialty","Date", "Sales")
df = sc.parallelize([row("A","pharma", "1/3/2019", 50),row("A","pharma", "1/4/2019", 60),row("A", "pharma","01/05/2019", 70),row("A","pharma", "1/8/2019", 80),row("A","ENT", "1/8/2019", 50),row("A","ENT", "1/9/2019", 65),row("A","ENT", "1/11/2019", 40)]).toDF()
w = Window.partitionBy("product","specialty).orderBy("date")
df.withColumn("new_data_date", expr("add_months(data_date, 1)"))
df.withcolumn("sales",F.where(col("date") isin col("new_data_date")
df=df.withColumn('index', (year('Date') - 2020) * 12 + month('Date')).withColumn('avg',sum('Sales').over(w) / 3)
我很惊讶地添加到销售价值为零而错过日期值的地方.并计算3个月的平均值.
I am struck adding where ever date value is missed with sales value is zero . And calculate 3month average .
推荐答案
您可以使用SparkSQL内置函数序列创建丢失的月份并设置其销售额= 0,使用Window聚合函数计算所需的 end_date
和最后3个月的平均销售额.下面,出于说明目的,我将代码分为三个步骤,您可以根据自己的要求将它们合并.
You can use SparkSQL builtin functions transform + sequence to create the missing months and set their sales=0, use Window aggregate function to calculate required end_date
and the final 3-month average sales. Below I divided the code into three steps for illustration purpose, you can merge them based on your own requirements.
注意:,这假设每个不同月份最多有一个记录,并且所有日期值都具有day = 1,否则使用将日期截断为 month 级别> F.trunc(F.to_date('date','d/M/yyyy'),"month")
和/或定义重复条目的逻辑.
Note: this assumed at most one record in each distinct month and all the date values have day=1, otherwise truncate the date to the month level using F.trunc(F.to_date('date', 'd/M/yyyy'), "month")
and/or define the logic for duplicate entries.
from pyspark.sql import functions as F, Window
df = spark.createDataFrame([
('A', 'pharma', '1/3/2019', 50), ('A', 'pharma', '1/4/2019', 60),
('A', 'pharma', '1/5/2019', 70), ('A', 'pharma', '1/8/2019', 80),
('A', 'ENT', '1/8/2019', 50), ('A', 'ENT', '1/9/2019', 65),
('A', 'ENT', '1/11/2019', 40)
], ['product', 'specialty', 'date', 'sales'])
df = df.withColumn('date', F.to_date('date', 'd/M/yyyy'))
第1步::设置WinSpec w1
并使用Window聚合函数
Step-1: set up WinSpec w1
and use Window aggregate function lead to find the next date over(w1), convert it to the previous months to set up date sequences:
w1 = Window.partitionBy('product', 'specialty').orderBy('date')
df1 = df.withColumn('end_date', F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
+-------+---------+----------+-----+----------+
|product|specialty| date|sales| end_date|
+-------+---------+----------+-----+----------+
| A| ENT|2019-08-01| 50|2019-08-01|
| A| ENT|2019-09-01| 65|2019-10-01|
| A| ENT|2019-11-01| 40|2019-11-01|
| A| pharma|2019-03-01| 50|2019-03-01|
| A| pharma|2019-04-01| 60|2019-04-01|
| A| pharma|2019-05-01| 70|2019-07-01|
| A| pharma|2019-08-01| 80|2019-08-01|
+-------+---------+----------+-----+----------+
第2步:使用 months_between(end_date,date)
计算两个日期之间的月份数,并使用transform函数迭代 sequence(0,#months)
,使用date = add_months(date,i)
和sales = IF(i = 0,sales,0)
创建一个named_struct,使用 inline_outer 来展开结构数组:
Step-2: use months_between(end_date, date)
to calculate # of months between two dates, and use transform function to iterate through sequence(0, #months)
, create a named_struct with date=add_months(date,i)
and sales=IF(i=0,sales,0)
, use inline_outer to explode the array of structs:
df2 = df1.selectExpr("product", "specialty", """
inline_outer(
transform(
sequence(0,int(months_between(end_date, date))),
i -> (add_months(date,i) as date, IF(i=0,sales,0) as sales)
)
)
""")
+-------+---------+----------+-----+
|product|specialty| date|sales|
+-------+---------+----------+-----+
| A| ENT|2019-08-01| 50|
| A| ENT|2019-09-01| 65|
| A| ENT|2019-10-01| 0|
| A| ENT|2019-11-01| 40|
| A| pharma|2019-03-01| 50|
| A| pharma|2019-04-01| 60|
| A| pharma|2019-05-01| 70|
| A| pharma|2019-06-01| 0|
| A| pharma|2019-07-01| 0|
| A| pharma|2019-08-01| 80|
+-------+---------+----------+-----+
第3步::使用以下WinSpec w2
和聚合函数来计算平均值:
Step-3: use the following WinSpec w2
and the aggregate function to calculate the average:
N = 3
w2 = Window.partitionBy('product', 'specialty').orderBy('date').rowsBetween(-N+1,0)
df_new = df2.select("*", F.round(F.sum('sales').over(w2)/N,2).alias(f'{N}month_avg_sales'))
+-------+---------+----------+-----+----------------+
|product|specialty| date|sales|3month_avg_sales|
+-------+---------+----------+-----+----------------+
| A| ENT|2019-08-01| 50| 16.67|
| A| ENT|2019-09-01| 65| 38.33|
| A| ENT|2019-10-01| 0| 38.33|
| A| ENT|2019-11-01| 40| 35.0|
| A| pharma|2019-03-01| 50| 16.67|
| A| pharma|2019-04-01| 60| 36.67|
| A| pharma|2019-05-01| 70| 60.0|
| A| pharma|2019-06-01| 0| 43.33|
| A| pharma|2019-07-01| 0| 23.33|
| A| pharma|2019-08-01| 80| 26.67|
+-------+---------+----------+-----+----------------+
这篇关于用零填充缺失的销售值,并在PySpark中计算3个月的平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!