在SparkSQL中使用窗口函数(dense_rank())选择 [英] select with window function (dense_rank()) in SparkSQL
问题描述
我有一个表,其中包含客户购买的记录,我需要指定购买是在特定的日期时间窗口中进行的,一个窗口是8天,因此,如果我今天进行了购买,则如果窗口号是5天,则表示我的购买次数是5天1,但如果我今天在第一天和接下来的8天都这样做了,那么第一次购买将在窗口1中进行,而最后一次购买将在窗口2中进行
I have a table which contains records for customer purchases, I need to specify that purchase was made in specific datetime window one window is 8 days , so if I had purchase today and one in 5 days its mean my purchase if window number 1, but if I did it on day one today and next in 8 days, first purchase will be in window 1 and the last purchase in window 2
create temporary table transactions
(client_id int,
transaction_ts datetime,
store_id int)
insert into transactions values
(1,'2018-06-01 12:17:37', 1),
(1,'2018-06-02 13:17:37', 2),
(1,'2018-06-03 14:17:37', 3),
(1,'2018-06-09 10:17:37', 2),
(2,'2018-06-02 10:17:37', 1),
(2,'2018-06-02 13:17:37', 2),
(2,'2018-06-08 14:19:37', 3),
(2,'2018-06-16 13:17:37', 2),
(2,'2018-06-17 14:17:37', 3)
该窗口是8天,问题是我不明白如何指定密集型()来查看datetime并在8天内创建一个窗口, 结果,我需要这样的东西
the window is 8 days, the problem is I don't understand how to specify for dense_rank() OVER (PARTITION BY) to look at datetime and make a window in 8 days, as result I need something like this
1,'2018-06-01 12:17:37', 1,1
1,'2018-06-02 13:17:37', 2,1
1,'2018-06-03 14:17:37', 3,1
1,'2018-06-09 10:17:37', 2,2
2,'2018-06-02 10:17:37', 1,1
2,'2018-06-02 13:17:37', 2,1
2,'2018-06-08 14:19:37', 3,2
2,'2018-06-16 13:17:37', 2,3
2,'2018-06-17 14:17:37', 3,3
有什么想法吗?我可以在Mysql或Spark SQL中运行它,但是Mysql不支持分区. 仍然找不到解决方案!任何帮助
any idea how to get it? I can run it in Mysql or Spark SQL, but Mysql doesn't support partition. Still cannot find solution! any help
推荐答案
您很可能可以在Spark SQL中使用时间和分区窗口函数来解决此问题:
Most likely you may solve this in Spark SQL using time and partition window functions:
val purchases = Seq((1,"2018-06-01 12:17:37", 1), (1,"2018-06-02 13:17:37", 2), (1,"2018-06-03 14:17:37", 3), (1,"2018-06-09 10:17:37", 2), (2,"2018-06-02 10:17:37", 1), (2,"2018-06-02 13:17:37", 2), (2,"2018-06-08 14:19:37", 3), (2,"2018-06-16 13:17:37", 2), (2,"2018-06-17 14:17:37", 3)).toDF("client_id", "transaction_ts", "store_id")
purchases.show(false)
+---------+-------------------+--------+
|client_id|transaction_ts |store_id|
+---------+-------------------+--------+
|1 |2018-06-01 12:17:37|1 |
|1 |2018-06-02 13:17:37|2 |
|1 |2018-06-03 14:17:37|3 |
|1 |2018-06-09 10:17:37|2 |
|2 |2018-06-02 10:17:37|1 |
|2 |2018-06-02 13:17:37|2 |
|2 |2018-06-08 14:19:37|3 |
|2 |2018-06-16 13:17:37|2 |
|2 |2018-06-17 14:17:37|3 |
+---------+-------------------+--------+
val groupedByTimeWindow = purchases.groupBy($"client_id", window($"transaction_ts", "8 days")).agg(collect_list("transaction_ts").as("transaction_tss"), collect_list("store_id").as("store_ids"))
val withWindowNumber = groupedByTimeWindow.withColumn("window_number", row_number().over(windowByClient))
withWindowNumber.orderBy("client_id", "window.start").show(false)
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
|client_id|window |transaction_tss |store_ids|window_number|
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
|1 |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-01 12:17:37, 2018-06-02 13:17:37, 2018-06-03 14:17:37]|[1, 2, 3]|1 |
|1 |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-09 10:17:37] |[2] |2 |
|2 |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-02 10:17:37, 2018-06-02 13:17:37] |[1, 2] |1 |
|2 |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-08 14:19:37] |[3] |2 |
|2 |[2018-06-13 17:00:00.0,2018-06-21 17:00:00.0]|[2018-06-16 13:17:37, 2018-06-17 14:17:37] |[2, 3] |3 |
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
如果需要,可以explode
列出store_ids或transaction_tss中的元素.
If you need, you may explode
list elements from store_ids or transaction_tss.
希望有帮助!
这篇关于在SparkSQL中使用窗口函数(dense_rank())选择的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!