如何在Spark中计算每个客户在12个月内的订单总和滑动1个月 [英] How to compute the sum of orders over a 12 months period sliding by 1 month per customer in Spark
问题描述
我刚接触Scala时比较陌生。目前,我正在尝试在12个月的月度汇总中汇总Spark中的订单数据。
I am relatively new to spark with Scala. currently I am trying to aggregate order data in spark over a 12 months period that slides monthly.
下面是我的数据的一个简单示例,我尝试对其进行格式化,以便您可以轻松对其进行测试
Below is a simple sample of my data, I tried to format it so you can easily test it
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
var sample = Seq(("C1","01/01/2016", 20), ("C1","02/01/2016", 5),
("C1","03/01/2016", 2), ("C1","04/01/2016", 3), ("C1","05/01/2017", 5),
("C1","08/01/2017", 5), ("C1","01/02/2017", 10), ("C1","01/02/2017", 10),
("C1","01/03/2017", 10)).toDF("id","order_date", "orders")
sample = sample.withColumn("order_date",
to_date(unix_timestamp($"order_date", "dd/MM/yyyy").cast("timestamp")))
sample.show
+---+----------+------+
| id|order_date|orders|
+---+----------+------+
| C1|2016-01-01| 20|
| C1|2016-01-02| 5|
| C1|2016-01-03| 2|
| C1|2016-01-04| 3|
| C1|2017-01-05| 5|
| C1|2017-01-08| 5|
| C1|2017-02-01| 10|
| C1|2017-02-01| 10|
| C1|2017-03-01| 10|
+---+----------+------+
强加给我的结果如下。
the imposed upon me outcome is the following.
id period_start period_end rolling
C1 2015-01-01 2016-01-01 30
C1 2016-01-01 2017-01-01 40
C1 2016-02-01 2017-02-01 30
C1 2016-03-01 2017-03-01 40
到目前为止我一直想做的事情
what I tried to do so far
我将每个客户的日期折叠为每月的第一天
I collapsed the dates per costumer to the first day of the month
(EI 2016-01- [1..31] >> 2016-01-01)
(e.i. 2016-01-[1..31] >> 2016-01-01 )
import org.joda.time._
val collapse_month = (month:Integer, year:Integer ) => {
var dt = new DateTime().withYear(year)
.withMonthOfYear(month)
.withDayOfMonth(1)
dt.toString("yyyy-MM-dd")
}
val collapse_month_udf = udf(collapse_month)
sample = sample.withColumn("period_end",
collapse_month_udf(
month(col("order_date")),
year(col("order_date"))
).as("date"))
sample.groupBy($"id", $"period_end")
.agg(sum($"orders").as("orders"))
.orderBy("period_end").show
+---+----------+------+
| id|period_end|orders|
+---+----------+------+
| C1|2016-01-01| 30|
| C1|2017-01-01| 10|
| C1|2017-02-01| 20|
| C1|2017-03-01| 10|
+---+----------+------+
我尝试了提供的 window
函数,但是我无法使用12个月的滑动选项。
I tried the provided window
function but I was not able to use 12 months sliding by one option.
我真的不确定从这一点出发的最佳方法是什么,考虑到要处理的数据量,这不会花5个小时。
I am really not sure what is the best way to proceed from this point, that would not take 5 hours given how much data I have to work with.
任何帮助将不胜感激。
推荐答案
尝试提供的窗口功能,但我无法使用12个月的滑动选项。
tried the provided window function but I was not able to use 12 months sliding by one option.
您仍然可以较长时间使用窗口
,但是所有参数都必须用天或周表示:
You can still use window
with longer intervals, but all parameters have to be expressed in days or weeks:
window($"order_date", "365 days", "28 days")
不幸的是,窗口
不尊重月份或年份
Unfortunately window
this won't respect month or year boundaries, so it won't be that useful for you.
我个人将首先汇总数据:
Personally I would aggregate data first:
val byMonth = sample
.groupBy($"id", trunc($"order_date", "month").alias("order_month"))
.agg(sum($"orders").alias("orders"))
+---+-----------+-----------+
| id|order_month|sum(orders)|
+---+-----------+-----------+
| C1| 2017-01-01| 10|
| C1| 2016-01-01| 30|
| C1| 2017-02-01| 20|
| C1| 2017-03-01| 10|
+---+-----------+-----------+
创建参考日期范围:
import java.time.temporal.ChronoUnit
val Row(start: java.sql.Date, end: java.sql.Date) = byMonth
.select(min($"order_month"), max($"order_month"))
.first
val months = (0L to ChronoUnit.MONTHS.between(
start.toLocalDate, end.toLocalDate))
.map(i => java.sql.Date.valueOf(start.toLocalDate.plusMonths(i)))
.toDF("order_month")
并结合唯一的ID:
val ref = byMonth.select($"id").distinct.crossJoin(months)
并加入源代码:
val expanded = ref.join(byMonth, Seq("id", "order_month"), "leftouter")
+---+-----------+------+
| id|order_month|orders|
+---+-----------+------+
| C1| 2016-01-01| 30|
| C1| 2016-02-01| null|
| C1| 2016-03-01| null|
| C1| 2016-04-01| null|
| C1| 2016-05-01| null|
| C1| 2016-06-01| null|
| C1| 2016-07-01| null|
| C1| 2016-08-01| null|
| C1| 2016-09-01| null|
| C1| 2016-10-01| null|
| C1| 2016-11-01| null|
| C1| 2016-12-01| null|
| C1| 2017-01-01| 10|
| C1| 2017-02-01| 20|
| C1| 2017-03-01| 10|
+---+-----------+------+
使用这样准备的数据,您可以使用窗口函数:
With data prepared like this you can use window functions:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"id")
.orderBy($"order_month")
.rowsBetween(-12, Window.currentRow)
expanded.withColumn("rolling", sum("orders").over(w))
.na.drop(Seq("orders"))
.select(
$"order_month" - expr("INTERVAL 12 MONTHS") as "period_start",
$"order_month" as "period_end",
$"rolling")
+------------+----------+-------+
|period_start|period_end|rolling|
+------------+----------+-------+
| 2015-01-01|2016-01-01| 30|
| 2016-01-01|2017-01-01| 40|
| 2016-02-01|2017-02-01| 30|
| 2016-03-01|2017-03-01| 40|
+------------+----------+-------+
请注意,这是一项非常昂贵的操作,至少需要进行两次洗牌:
Please be advised this is a very expensive operation, requiring at least two shuffles:
== Physical Plan ==
*Project [cast(cast(order_month#104 as timestamp) - interval 1 years as date) AS period_start#1387, order_month#104 AS period_end#1388, rolling#1375L]
+- *Filter AtLeastNNulls(n, orders#55L)
+- Window [sum(orders#55L) windowspecdefinition(id#7, order_month#104 ASC NULLS FIRST, ROWS BETWEEN 12 PRECEDING AND CURRENT ROW) AS rolling#1375L], [id#7], [order_month#104 ASC NULLS FIRST]
+- *Sort [id#7 ASC NULLS FIRST, order_month#104 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#7, 200)
+- *Project [id#7, order_month#104, orders#55L]
+- *BroadcastHashJoin [id#7, order_month#104], [id#181, order_month#49], LeftOuter, BuildRight
:- BroadcastNestedLoopJoin BuildRight, Cross
: :- *HashAggregate(keys=[id#7], functions=[])
: : +- Exchange hashpartitioning(id#7, 200)
: : +- *HashAggregate(keys=[id#7], functions=[])
: : +- *HashAggregate(keys=[id#7, trunc(order_date#14, month)#1394], functions=[])
: : +- Exchange hashpartitioning(id#7, trunc(order_date#14, month)#1394, 200)
: : +- *HashAggregate(keys=[id#7, trunc(order_date#14, month) AS trunc(order_date#14, month)#1394], functions=[])
: : +- LocalTableScan [id#7, order_date#14]
: +- BroadcastExchange IdentityBroadcastMode
: +- LocalTableScan [order_month#104]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, date, true]))
+- *HashAggregate(keys=[id#181, trunc(order_date#14, month)#1395], functions=[sum(cast(orders#183 as bigint))])
+- Exchange hashpartitioning(id#181, trunc(order_date#14, month)#1395, 200)
+- *HashAggregate(keys=[id#181, trunc(order_date#14, month) AS trunc(order_date#14, month)#1395], functions=[partial_sum(cast(orders#183 as bigint))])
+- LocalTableScan [id#181, order_date#14, orders#183]
也可以使用 rangeBetween
框架来表示,但是您必须先对数据进行编码:
It is also possible to express this using rangeBetween
frame, but you have to encode data first:
val encoded = byMonth
.withColumn("order_month_offset",
// Choose "zero" date appropriate in your scenario
months_between($"order_month", to_date(lit("1970-01-01"))))
val w = Window.partitionBy($"id")
.orderBy($"order_month_offset")
.rangeBetween(-12, Window.currentRow)
encoded.withColumn("rolling", sum($"orders").over(w))
+---+-----------+------+------------------+-------+
| id|order_month|orders|order_month_offset|rolling|
+---+-----------+------+------------------+-------+
| C1| 2016-01-01| 30| 552.0| 30|
| C1| 2017-01-01| 10| 564.0| 40|
| C1| 2017-02-01| 20| 565.0| 30|
| C1| 2017-03-01| 10| 566.0| 40|
+---+-----------+------+------------------+-------+
这将使带有参考的联接过时并简化执行计划。
This would make the join with reference obsolete and simplify execution plan.
这篇关于如何在Spark中计算每个客户在12个月内的订单总和滑动1个月的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!