这个用例可以通过spark的滞后/任何其他功能来完成吗,如果可以,这怎么做 [英] can this use-case be done by lag/any-other function of spark, if so how can this done

查看:55
本文介绍了这个用例可以通过spark的滞后/任何其他功能来完成吗,如果可以,这怎么做的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spark-2.4.1v.我在我的项目中有一个用例,对于每个日期(process_date),我需要考虑当天记录和前一天记录,并对该数据集执行某些其他操作.那么如何为此准备数据集呢?我尝试了滞后功能,但没有取得太大的成功.

对于上述用例,给定的数据如下:

+----------+----------+----+-------+------------+------------+|公司编号|gen_date|年|季度|total_assets|create_date|+------------+------------+----+-------+------------+-----------+|989856662|2019-01-02|2019|1|3900.435058|2019-09-11||989856665|2019-01-02|2019|1|4836.435058|2019-09-11||989856667|2019-01-02|2019|1|5836.435058|2019-09-11||989856662|2019-01-01|2019|1|3800.435058|2019-09-11||989856665|2019-01-01|2019|1|3834.435058|2019-09-11||989856667|2019-01-01|2019|1|5834.435058|2019-09-11||989856662|2018-12-31|2018|4|3700.435058|2019-09-11||989856665|2018-12-31|2018|4|3900.435058|2019-09-11||989856667|2018-12-31|2018|4|5833.435058|2019-09-11||989856662|2018-12-30|2018|4|3832.435058|2019-09-11||989856665|2018-12-30|2018|4|3700.435058|2019-09-11||989856667|2018-12-30|2018|4|5832.435058|2019-09-11|+------------+------------+----+-------+------------+-----------+

<块引用>

这里 gen_date 是关键列,对于每个 gen_date ,我需要取其先前可用的 gen_date 记录.这些将被处理一起设置,即 process_date 2019-01-02 - 它应该有记录 2019-01-02 &2019-01-01 同样适用于 process_date 记录gen_date 2018-12-30 &它之前的 gen_date 即 2018-12-29,但是这里在 2018-12-29 gen_date 记录不可用,因此应该是考虑 gen_date 2018-12-30 记录.

在给定的集合中

对于 process_date 2019-01-02 =>( gen_date 2019-01-02 ) 的记录 + ( gen_date 2019-01-01 ) 的记录对于 process_date 2019-01-01 =>( gen_date 2019-01-01 ) 的记录 + ( gen_date 2018-12-31 ) 的记录对于 process_date 2018-12-31 =>( gen_date 2018-12-31 ) 的记录 + ( gen_date 2018-12-30 ) 的记录对于 process_date 2018-12-30 =>( gen_date 2018-12-30 ) + 没有以前的 gen_date 记录.

输出应该如下:

+----------+------------+----------+----+-------+------------+------------+|company_id|process_date|gen_date|年|季度|total_assets|create_date|+------------+------------+---------+----+-------+------------+-----------+|989856662|2019-01-02|2019-01-02|2019|1|3900.435058|2019-09-11||989856662|2019-01-02|2019-01-01|2019|1|3800.435058|2019-09-11||989856665|2019-01-02|2019-01-02|2019|1|4836.435058|2019-09-11||989856665|2019-01-02|2019-01-01|2019|1|3834.435058|2019-09-11||989856667|2019-01-02|2019-01-02|2019|1|5836.435058|2019-09-11||989856667|2019-01-02|2019-01-01|2019|1|5834.435058|2019-09-11||989856662|2019-01-01|2019-01-01|2019|1|3800.435058|2019-09-11||989856662|2019-01-01|2018-12-31|2018|4|3700.435058|2019-09-11||989856665|2019-01-01|2019-01-01|2019|1|3834.435058|2019-09-11||989856665|2019-01-01|2018-12-31|2018|4|3900.435058|2019-09-11||989856667|2019-01-01|2019-01-01|2019|1|5834.435058|2019-09-11||989856667|2019-01-01|2018-12-31|2018|4|5833.435058|2019-09-11||989856662|2018-12-31|2018-12-31|2018|4|3700.435058|2019-09-11||989856662|2018-12-31|2018-12-30|2018|4|3832.435058|2019-09-11||989856665|2018-12-31|2018-12-31|2018|4|3900.435058|2019-09-11||989856665|2018-12-31|2018-12-30|2018|4|3700.435058|2019-09-11||989856667|2018-12-31|2018-12-31|2018|4|5833.435058|2019-09-11||989856667|2018-12-31|2018-12-30|2018|4|5832.435058|2019-09-11||989856662|2018-12-30|2018-12-30|2018|4|3832.435058|2019-09-11||989856665|2018-12-30|2018-12-30|2018|4|3700.435058|2019-09-11||989856667|2018-12-30|2018-12-30|2018|4|5832.435058|2019-09-11|+------------+------------+---------+----+-------+------------+-----------+如何实现上述输出?

以下是相同的附加笔记本网址.

解决方案

为了获取给定 gen_datecompany_id 的前一天详细信息,您可以使用滞后函数如下spec,

val windowSpec = Window.partitionBy("company_id").orderBy("gen_date")val中间DF = finDF.withColumn(previous_gen_date", lag(gen_date",1).over(windowSpec))

以上步骤将根据 company_id 和 gen_date 为您获取上一代日期,您可以将此数据与原始数据连接以获得相关的前一天数据.

val finalDF = middleDF.alias(a").join(finDF.alias(b"), col(a.company_id") === col(b.company_id") &&col("a.previous_gen_date") === col("b.gen_date"), "left_outer").select(col(a.*"),col("b.year").as("previous_gen_date_year"),col("b.quarter").as("previous_gen_date_quarter"),col("b.total_assets").as("previous_gen_date_total_assets"),col("b.create_date").as("previous_gen_date_create_date"))

上述连接将产生前一天的完整数据以及生成日期.

+----------+----------+----+-------+------------+-----------+------------------+---------------------+------------------------------+------------------------------+------------------------------+|company_id|gen_date |年|季度|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|+------------+------------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+----------------------+-----------------------------+|989856662 |2018-12-30|2018|4 |3832.435058 |2019-09-11 |null |null |null |null |null ||989856662 |2018-12-31|2018|4 |3700.435058 |2019-09-11 |2018-12-30 |2018 |4 |3832.435058 |2019-09-11 ||989856662 |2019-01-01|2019|1 |3800.435058 |2019-09-11 |2018-12-31 |2018 |4 |3700.435058 |2019-09-11 ||989856662 |2019-01-02|2019|1 |3900.435058 |2019-09-11 |2019-01-01 |2019 |1 |3800.435058 |2019-09-11 |+------------+------------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+----------------------+-----------------------------+

在这里,您的 gen_date 也可以作为 process_date 列,通过它您可以比较任何操作的两天数据.

I am using spark-2.4.1v. I have a use-case in my project where , for each date(process_date) I need to consider that day record with previous day record, and do certain other operations on that data-set. So how can prepare data-set for this ? I tried with lag function but not got much success.

For the above Use-case, given data as below :

+----------+----------+----+-------+------------+-----------+
|company_id|  gen_date|year|quarter|total_assets|create_date|
+----------+----------+----+-------+------------+-----------+
| 989856662|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856665|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856667|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856662|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+----------+----+-------+------------+-----------+

Here gen_date is the key column , for each gen_date ,I need to take its previous available gen_date record. these will be processed together as set i.e. for process_date 2019-01-02 -- it should have records of 2019-01-02 & 2019-01-01 like wise for process_date records of gen_date 2018-12-30 & its previous gen_date i.e. 2018-12-29, but here in 2018-12-29 gen_date records are not available hence should be considered gen_date 2018-12-30 records.

In the given set

For process_date 2019-01-02 => records of ( gen_date 2019-01-02 ) + records of ( gen_date 2019-01-01) For process_date 2019-01-01 => records of ( gen_date 2019-01-01 ) + records of ( gen_date 2018-12-31) For process_date 2018-12-31 => records of ( gen_date 2018-12-31 ) + records of ( gen_date 2018-12-30) For process_date 2018-12-30 => records of ( gen_date 2018-12-30 ) + no previous gen_date records.

The out put should be as below :

+----------+------------+----------+----+-------+------------+-----------+
|company_id|process_date|  gen_date|year|quarter|total_assets|create_date|
+----------+------------+----------+----+-------+------------+-----------+
| 989856662|  2019-01-02|2019-01-02|2019|      1| 3900.435058| 2019-09-11|
| 989856662|  2019-01-02|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-02|2019|      1| 4836.435058| 2019-09-11|
| 989856665|  2019-01-02|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-02|2019|      1| 5836.435058| 2019-09-11|
| 989856667|  2019-01-02|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856662|  2019-01-01|2019-01-01|2019|      1| 3800.435058| 2019-09-11|
| 989856662|  2019-01-01|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856665|  2019-01-01|2019-01-01|2019|      1| 3834.435058| 2019-09-11|
| 989856665|  2019-01-01|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856667|  2019-01-01|2019-01-01|2019|      1| 5834.435058| 2019-09-11|
| 989856667|  2019-01-01|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-31|2018|      4| 3700.435058| 2019-09-11|
| 989856662|  2018-12-31|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-31|2018|      4| 3900.435058| 2019-09-11|
| 989856665|  2018-12-31|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-31|2018|      4| 5833.435058| 2019-09-11|
| 989856667|  2018-12-31|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
| 989856662|  2018-12-30|2018-12-30|2018|      4| 3832.435058| 2019-09-11|
| 989856665|  2018-12-30|2018-12-30|2018|      4| 3700.435058| 2019-09-11|
| 989856667|  2018-12-30|2018-12-30|2018|      4| 5832.435058| 2019-09-11|
+----------+------------+----------+----+-------+------------+-----------+

how to achieve above output ?

Below is the attached notebook url for the same.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/988191344931748/7035720262824085/latest.html

解决方案

In order to get the previous day details for the given gen_date and the company_id, you can use the lag function as with the following spec,

val windowSpec  = Window.partitionBy("company_id").orderBy("gen_date") 

val intermediateDF = finDF
  .withColumn("previous_gen_date", lag("gen_date",1).over(windowSpec))

The above step will get you the previous generation date based on the company_id and the gen_date and you can join this data with your original data to get the relevant previous day data.

val finalDF = intermediateDF.alias("a")
  .join(finDF.alias("b"), col("a.company_id") === col("b.company_id") &&
    col("a.previous_gen_date") === col("b.gen_date"), "left_outer")
    .select(col("a.*"),
      col("b.year").as("previous_gen_date_year"),
      col("b.quarter").as("previous_gen_date_quarter"),
      col("b.total_assets").as("previous_gen_date_total_assets"),
      col("b.create_date").as("previous_gen_date_create_date")
    )

The above join will yield the complete data for the previous day along with generate date.

+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|company_id|gen_date  |year|quarter|total_assets|create_date|previous_gen_date|previous_gen_date_year|previous_gen_date_quarter|previous_gen_date_total_assets|previous_gen_date_create_date|
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+
|989856662 |2018-12-30|2018|4      |3832.435058 |2019-09-11 |null             |null                  |null                     |null                          |null                         |
|989856662 |2018-12-31|2018|4      |3700.435058 |2019-09-11 |2018-12-30       |2018                  |4                        |3832.435058                   |2019-09-11                   |
|989856662 |2019-01-01|2019|1      |3800.435058 |2019-09-11 |2018-12-31       |2018                  |4                        |3700.435058                   |2019-09-11                   |
|989856662 |2019-01-02|2019|1      |3900.435058 |2019-09-11 |2019-01-01       |2019                  |1                        |3800.435058                   |2019-09-11                   |
+----------+----------+----+-------+------------+-----------+-----------------+----------------------+-------------------------+------------------------------+-----------------------------+

Here your gen_date can also act as process_date column, with this you compare the two days worth of data for any operation.

这篇关于这个用例可以通过spark的滞后/任何其他功能来完成吗,如果可以,这怎么做的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆