Apache Scala/Python Spark 2.4.4:按年份范围对数据进行分组以生成/分析新功能 [英] Apache Scala/Python Spark 2.4.4: Group data by year range to generate/analyze new feature

查看:18
本文介绍了Apache Scala/Python Spark 2.4.4:按年份范围对数据进行分组以生成/分析新功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遵循了我为特征工程生成的数据框,现在为了驱动另一个特征,我正在尝试创建 purchaseYearRange 列,我想在其中创建一个具有 3 年范围的列和按 itemNo、modelnumber 和 valueClass 列聚合 modelRatio、purchaseRatio 值.

I've following data frame which I generated for feature engineering and now in order to drive another feature, I am trying to create purchaseYearRange column where I want to create a column with 3 year range and aggregate modelRatio, purchaseRatio values by itemNo, modelnumber and valueClass columns.

例如:对于itemNo#7010032,我们将在新的purchaseYearRange 列和modelRatio, purchaseRatio 这些年份的值将在相应行中汇总.接下来,我将在接下来的 3 年里做同样的事情,分别是 1996-1997-1998、1997-1998-1999 等等.

E.g.: For itemNo#7010032, we will have a row with 1995-1996-1997 value in new purchaseYearRange column and modelRatio, purchaseRatio values for these years will be summed up in the respective row. Next, I'll do that same for next 3 years which will be 1996-1997-1998, 1997-1998-1999, etc.

此外,该项目有一行带有 itemClass - RGR,对于该行,我们将只有该行包含 1996 年的数据.

Also, this item has one row with itemClass - RGR, for that row we'll only have that row with 1996 data.

基本上,检查数据框中的 3 年数据,如果存在,则为三年范围窗口求和 modelRatio, purchaseRatio.如果三年数据不存在,则根据数据可用性汇总两年或一年.

Basically, check for 3 years data in dataframe and if it exists then sum modelRatio, purchaseRatio for three-year range window. If three-year data doesn't exist then sum for two or one year based on data availability.

  • 数据帧

| itemNo|modelnumber|itemClass |purchaseYear|          ModelRatio|      PurchaseRatio|
+-------+-----------+----------+------------+--------------------+-------------------+
|1321457|  A99AA25CA|       ATV|        1995|   1.801325096248545|                2.0|
|7010032|  A99AA25CB|       ATV|        1995|  1.0048348106365834|                2.0|
|7010032|  A99AA25CB|       ATV|        1996|  0.8899632912525741|                2.0|
|7010032|  A99AA25CB|       RGR|        1996|  0.7899632912525741|                1.0| 
|7010032|  A99AA25CB|       ATV|        1997|   1.669710806697108|                2.0|  
|7010032|  A99AA25CB|       ATV|        1998|  0.9982988629241651|                2.0| 
|7010032|  A99AA25CB|       ATV|        1999|0.006535947712418301|                1.0|
|7552901|  A99AA25CD|       ATV|        1995|   37.83901871250784|               12.0| 
|7552901|  A99AA25CD|       ATV|        1996|0.026143790849673203|                1.0|
|7552901|  A99AA25CD|       ATV|        1997|  0.9375951293759512|                2.0|

我是 Scala spark 的新手,并尝试使用 .createOrReplaceTempView("test") 然后应用 SQL 操作,但这种方式非常复杂.能否请您分享我如何完成它.请随时提出在 Python 和/或 Scala 中的解决方案.

I am new to scala spark and tried it using .createOrReplaceTempView("test") and then apply SQL operations but it is super complex this way. Could you please share how I can get it done. Please feel free to suggest solution in Python and or in Scala.

推荐答案

IIUC,可以使用 Spark SQL 窗口函数如下:(确保purchaseYear是数字或时间戳列)

IIUC, you can use Spark SQL Window function as following: (make sure the purchaseYear is a numeric or timestamp column)

编辑:根据评论,添加 all_puchase_years 以包含 3 年序列.请注意,ORDER BY itemNo, purchaseYear 子句仅用于演示目的.

per comments, added all_puchase_years to include 3-year sequence. Notice that ORDER BY itemNo, purchaseYear clause is only for demonstration purpose.

spark.sql(""" 
       SELECT itemNo 
       ,      modelnumber 
       ,      itemClass 
       ,      concat_ws('-', sort_array(collect_set(purchaseYear) OVER w1)) AS purchase_years 
       ,      concat_ws('-', sequence(purchaseYear, purchaseYear+2)) AS all_purchase_years
       ,      sum(PurchaseRatio) OVER w1 AS sum_PurchaseRatio 
       ,      sum(ModelRatio) OVER w1 AS sum_ModelRatio 
       FROM test 
       ORDER BY itemNo, purchaseYear 
       WINDOW w1 AS (
           PARTITION BY (itemNo, modelnumber, itemClass)  
           ORDER BY purchaseYear 
           RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING
       ) 
""").show() 
#+-------+-----------+---------+--------------+-----------------+--------------------+
#| itemNo|modelnumber|itemClass|purchase_years|sum_PurchaseRatio|      sum_ModelRatio|
#+-------+-----------+---------+--------------+-----------------+--------------------+
#|1321457|  A99AA25CA|      ATV|          1995|              2.0|   1.801325096248545|
#|7010032|  A99AA25CB|      ATV|1995-1996-1997|              6.0|   3.564508908586266|
#|7010032|  A99AA25CB|      RGR|          1996|              1.0|  0.7899632912525741|
#|7010032|  A99AA25CB|      ATV|1996-1997-1998|              6.0|  3.5579729608738475|
#|7010032|  A99AA25CB|      ATV|1997-1998-1999|              5.0|  2.6745456173336914|
#|7010032|  A99AA25CB|      ATV|     1998-1999|              3.0|  1.0048348106365834|
#|7010032|  A99AA25CB|      ATV|          1999|              1.0|0.006535947712418301|
#|7552901|  A99AA25CD|      ATV|1995-1996-1997|             15.0|   38.80275763273346|
#|7552901|  A99AA25CD|      ATV|     1996-1997|              3.0|  0.9637389202256245|
#|7552901|  A99AA25CD|      ATV|          1997|              2.0|  0.9375951293759512|
#+-------+-----------+---------+--------------+-----------------+--------------------+

这篇关于Apache Scala/Python Spark 2.4.4:按年份范围对数据进行分组以生成/分析新功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆