这个用例可以通过spark的滞后/任何其他功能来完成吗,如果可以,这怎么做 [英] can this use-case be done by lag/any-other function of spark, if so how can this done
问题描述
我使用的是 spark-2.4.1v.我在我的项目中有一个用例,对于每个日期(process_date),我需要考虑当天记录和前一天记录,并对该数据集执行某些其他操作.那么如何为此准备数据集呢?我尝试了滞后功能,但没有取得太大的成功.
对于上述用例,给定的数据如下:
+----------+----------+----+-------+------------+------------+|公司编号|gen_date|年|季度|total_assets|create_date|+------------+------------+----+-------+------------+-----------+|989856662|2019-01-02|2019|1|3900.435058|2019-09-11||989856665|2019-01-02|2019|1|4836.435058|2019-09-11||989856667|2019-01-02|2019|1|5836.435058|2019-09-11||989856662|2019-01-01|2019|1|3800.435058|2019-09-11||989856665|2019-01-01|2019|1|3834.435058|2019-09-11||989856667|2019-01-01|2019|1|5834.435058|2019-09-11||989856662|2018-12-31|2018|4|3700.435058|2019-09-11||989856665|2018-12-31|2018|4|3900.435058|2019-09-11||989856667|2018-12-31|2018|4|5833.435058|2019-09-11||989856662|2018-12-30|2018|4|3832.435058|2019-09-11||989856665|2018-12-30|2018|4|3700.435058|2019-09-11||989856667|2018-12-30|2018|4|5832.435058|2019-09-11|+------------+------------+----+-------+------------+-----------+
<块引用>
这里 gen_date 是关键列,对于每个 gen_date ,我需要取其先前可用的 gen_date 记录.这些将被处理一起设置,即 process_date 2019-01-02 - 它应该有记录 2019-01-02 &2019-01-01 同样适用于 process_date 记录gen_date 2018-12-30 &它之前的 gen_date 即 2018-12-29,但是这里在 2018-12-29 gen_date 记录不可用,因此应该是考虑 gen_date 2018-12-30 记录.
在给定的集合中
对于 process_date 2019-01-02 =>( gen_date 2019-01-02 ) 的记录 + ( gen_date 2019-01-01 ) 的记录对于 process_date 2019-01-01 =>( gen_date 2019-01-01 ) 的记录 + ( gen_date 2018-12-31 ) 的记录对于 process_date 2018-12-31 =>( gen_date 2018-12-31 ) 的记录 + ( gen_date 2018-12-30 ) 的记录对于 process_date 2018-12-30 =>( gen_date 2018-12-30 ) + 没有以前的 gen_date 记录.
输出应该如下:
+----------+------------+----------+----+-------+------------+------------+|company_id|process_date|gen_date|年|季度|total_assets|create_date|+------------+------------+---------+----+-------+------------+-----------+|989856662|2019-01-02|2019-01-02|2019|1|3900.435058|2019-09-11||989856662|2019-01-02|2019-01-01|2019|1|3800.435058|2019-09-11||989856665|2019-01-02|2019-01-02|2019|1|4836.435058|2019-09-11||989856665|2019-01-02|2019-01-01|2019|1|3834.435058|2019-09-11||989856667|2019-01-02|2019-01-02|2019|1|5836.435058|2019-09-11||989856667|2019-01-02|2019-01-01|2019|1|5834.435058|2019-09-11||989856662|2019-01-01|2019-01-01|2019|1|3800.435058|2019-09-11||989856662|2019-01-01|2018-12-31|2018|4|3700.435058|2019-09-11||989856665|2019-01-01|2019-01-01|2019|1|3834.435058|2019-09-11||989856665|2019-01-01|2018-12-31|2018|4|3900.435058|2019-09-11||989856667|2019-01-01|2019-01-01|2019|1|5834.435058|2019-09-11||989856667|2019-01-01|2018-12-31|2018|4|5833.435058|2019-09-11||989856662|2018-12-31|2018-12-31|2018|4|3700.435058|2019-09-11||989856662|2018-12-31|2018-12-30|2018|4|3832.435058|2019-09-11||989856665|2018-12-31|2018-12-31|2018|4|3900.435058|2019-09-11||989856665|2018-12-31|2018-12-30|2018|4|3700.435058|2019-09-11||989856667|2018-12-31|2018-12-31|2018|4|5833.435058|2019-09-11||989856667|2018-12-31|2018-12-30|2018|4|5832.435058|2019-09-11||989856662|2018-12-30|2018-12-30|2018|4|3832.435058|2019-09-11||989856665|2018-12-30|2018-12-30|2018|4|3700.435058|2019-09-11||989856667|2018-12-30|2018-12-30|2018|4|5832.435058|2019-09-11|+------------+------------+---------+----+-------+------------+-----------+如何实现上述输出?
以下是相同的附加笔记本网址.
为了获取给定 gen_date
和 company_id
的前一天详细信息,您可以使用滞后函数如下spec
,
val windowSpec = Window.partitionBy("company_id").orderBy("gen_date")val中间DF = finDF.withColumn(previous_gen_date", lag(gen_date",1).over(windowSpec))
以上步骤将根据 company_id 和 gen_date 为您获取上一代日期,您可以将此数据与原始数据连接以获得相关的前一天数据.
val finalDF = middleDF.alias(a").join(finDF.alias(b"), col(a.company_id") === col(b.company_id") &&col("a.previous_gen_date") === col("b.gen_date"), "left_outer").select(col(a.*"),col("b.year").as("previous_gen_date_year"),col("b.quarter").as("previous_gen_date_quarter"),col("b.total_assets").as("previous_gen_date_total_assets"),col("b.create_date").as("previous_gen_date_create_date"))
上述连接将产生前一天的完整数据以及生成日期.
+----------+----------+----+-------+------------+-----------+------------------+---------------------+------------------------------+------------------------------+------------------------------+|company_id|gen_date |年|季度|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|+------------+------------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+----------------------+-----------------------------+|989856662 |2018-12-30|2018|4 |3832.435058 |2019-09-11 |null |null |null |null |null ||989856662 |2018-12-31|2018|4 |3700.435058 |2019-09-11 |2018-12-30 |2018 |4 |3832.435058 |2019-09-11 ||989856662 |2019-01-01|2019|1 |3800.435058 |2019-09-11 |2018-12-31 |2018 |4 |3700.435058 |2019-09-11 ||989856662 |2019-01-02|2019|1 |3900.435058 |2019-09-11 |2019-01-01 |2019 |1 |3800.435058 |2019-09-11 |+------------+------------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+----------------------+-----------------------------+
在这里,您的 gen_date
也可以作为 process_date
列,通过它您可以比较任何操作的两天数据.
I am using spark-2.4.1v. I have a use-case in my project where , for each date(process_date) I need to consider that day record with previous day record, and do certain other operations on that data-set. So how can prepare data-set for this ? I tried with lag function but not got much success.
For the above Use-case, given data as below :
+----------+----------+----+-------+------------+-----------+
|company_id| gen_date|year|quarter|total_assets|create_date|
+----------+----------+----+-------+------------+-----------+
| 989856662|2019-01-02|2019| 1| 3900.435058| 2019-09-11|
| 989856665|2019-01-02|2019| 1| 4836.435058| 2019-09-11|
| 989856667|2019-01-02|2019| 1| 5836.435058| 2019-09-11|
| 989856662|2019-01-01|2019| 1| 3800.435058| 2019-09-11|
| 989856665|2019-01-01|2019| 1| 3834.435058| 2019-09-11|
| 989856667|2019-01-01|2019| 1| 5834.435058| 2019-09-11|
| 989856662|2018-12-31|2018| 4| 3700.435058| 2019-09-11|
| 989856665|2018-12-31|2018| 4| 3900.435058| 2019-09-11|
| 989856667|2018-12-31|2018| 4| 5833.435058| 2019-09-11|
| 989856662|2018-12-30|2018| 4| 3832.435058| 2019-09-11|
| 989856665|2018-12-30|2018| 4| 3700.435058| 2019-09-11|
| 989856667|2018-12-30|2018| 4| 5832.435058| 2019-09-11|
+----------+----------+----+-------+------------+-----------+
Here gen_date is the key column , for each gen_date ,I need to take its previous available gen_date record. these will be processed together as set i.e. for process_date 2019-01-02 -- it should have records of 2019-01-02 & 2019-01-01 like wise for process_date records of gen_date 2018-12-30 & its previous gen_date i.e. 2018-12-29, but here in 2018-12-29 gen_date records are not available hence should be considered gen_date 2018-12-30 records.
In the given set
For process_date 2019-01-02 => records of ( gen_date 2019-01-02 ) + records of ( gen_date 2019-01-01) For process_date 2019-01-01 => records of ( gen_date 2019-01-01 ) + records of ( gen_date 2018-12-31) For process_date 2018-12-31 => records of ( gen_date 2018-12-31 ) + records of ( gen_date 2018-12-30) For process_date 2018-12-30 => records of ( gen_date 2018-12-30 ) + no previous gen_date records.
The out put should be as below :
+----------+------------+----------+----+-------+------------+-----------+
|company_id|process_date| gen_date|year|quarter|total_assets|create_date|
+----------+------------+----------+----+-------+------------+-----------+
| 989856662| 2019-01-02|2019-01-02|2019| 1| 3900.435058| 2019-09-11|
| 989856662| 2019-01-02|2019-01-01|2019| 1| 3800.435058| 2019-09-11|
| 989856665| 2019-01-02|2019-01-02|2019| 1| 4836.435058| 2019-09-11|
| 989856665| 2019-01-02|2019-01-01|2019| 1| 3834.435058| 2019-09-11|
| 989856667| 2019-01-02|2019-01-02|2019| 1| 5836.435058| 2019-09-11|
| 989856667| 2019-01-02|2019-01-01|2019| 1| 5834.435058| 2019-09-11|
| 989856662| 2019-01-01|2019-01-01|2019| 1| 3800.435058| 2019-09-11|
| 989856662| 2019-01-01|2018-12-31|2018| 4| 3700.435058| 2019-09-11|
| 989856665| 2019-01-01|2019-01-01|2019| 1| 3834.435058| 2019-09-11|
| 989856665| 2019-01-01|2018-12-31|2018| 4| 3900.435058| 2019-09-11|
| 989856667| 2019-01-01|2019-01-01|2019| 1| 5834.435058| 2019-09-11|
| 989856667| 2019-01-01|2018-12-31|2018| 4| 5833.435058| 2019-09-11|
| 989856662| 2018-12-31|2018-12-31|2018| 4| 3700.435058| 2019-09-11|
| 989856662| 2018-12-31|2018-12-30|2018| 4| 3832.435058| 2019-09-11|
| 989856665| 2018-12-31|2018-12-31|2018| 4| 3900.435058| 2019-09-11|
| 989856665| 2018-12-31|2018-12-30|2018| 4| 3700.435058| 2019-09-11|
| 989856667| 2018-12-31|2018-12-31|2018| 4| 5833.435058| 2019-09-11|
| 989856667| 2018-12-31|2018-12-30|2018| 4| 5832.435058| 2019-09-11|
| 989856662| 2018-12-30|2018-12-30|2018| 4| 3832.435058| 2019-09-11|
| 989856665| 2018-12-30|2018-12-30|2018| 4| 3700.435058| 2019-09-11|
| 989856667| 2018-12-30|2018-12-30|2018| 4| 5832.435058| 2019-09-11|
+----------+------------+----------+----+-------+------------+-----------+
how to achieve above output ?
Below is the attached notebook url for the same.
In order to get the previous day details for the given gen_date
and the company_id
, you can use the lag function as with the following spec
,
val windowSpec = Window.partitionBy("company_id").orderBy("gen_date")
val intermediateDF = finDF
.withColumn("previous_gen_date", lag("gen_date",1).over(windowSpec))
The above step will get you the previous generation date based on the company_id and the gen_date and you can join this data with your original data to get the relevant previous day data.
val finalDF = intermediateDF.alias("a")
.join(finDF.alias("b"), col("a.company_id") === col("b.company_id") &&
col("a.previous_gen_date") === col("b.gen_date"), "left_outer")
.select(col("a.*"),
col("b.year").as("previous_gen_date_year"),
col("b.quarter").as("previous_gen_date_quarter"),
col("b.total_assets").as("previous_gen_date_total_assets"),
col("b.create_date").as("previous_gen_date_create_date")
)
The above join will yield the complete data for the previous day along with generate date.
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|company_id|gen_date |year|quarter|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|989856662 |2018-12-30|2018|4 |3832.435058 |2019-09-11 |null |null |null |null |null |
|989856662 |2018-12-31|2018|4 |3700.435058 |2019-09-11 |2018-12-30 |2018 |4 |3832.435058 |2019-09-11 |
|989856662 |2019-01-01|2019|1 |3800.435058 |2019-09-11 |2018-12-31 |2018 |4 |3700.435058 |2019-09-11 |
|989856662 |2019-01-02|2019|1 |3900.435058 |2019-09-11 |2019-01-01 |2019 |1 |3800.435058 |2019-09-11 |
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
Here your gen_date
can also act as process_date
column, with this you compare the two days worth of data for any operation.
这篇关于这个用例可以通过spark的滞后/任何其他功能来完成吗,如果可以,这怎么做的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!