如何有效地使用开窗函数基于N个先前值确定下N个行 [英] How to use windowing functions efficiently to decide next N number of rows based on N number of previous values

查看:73
本文介绍了如何有效地使用开窗函数基于N个先前值确定下N个行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下数据.

+----------+----+-------+-----------------------+
|      date|item|avg_val|conditions             |
+----------+----+-------+-----------------------+
|01-10-2020|   x|     10|                      0|
|02-10-2020|   x|     10|                      0|
|03-10-2020|   x|     15|                      1|
|04-10-2020|   x|     15|                      1|
|05-10-2020|   x|      5|                      0|
|06-10-2020|   x|     13|                      1|
|07-10-2020|   x|     10|                      1|
|08-10-2020|   x|     10|                      0|
|09-10-2020|   x|     15|                      1|
|01-10-2020|   y|     10|                      0|
|02-10-2020|   y|     18|                      0|
|03-10-2020|   y|      6|                      1|
|04-10-2020|   y|     10|                      0|
|05-10-2020|   y|     20|                      0|
+----------+----+-------+-----------------------+

我正试图基于以下内容创建一个称为标志级别的新列

I am tring to create a new column called flag level based on

  1. 如果标志值为0,则新列值为0.
  2. 如果标志为1,则新列将为1,接下来的N个行数为零,即无需检查下一个N值.此过程将应用于每个项目,即按项目进行分区.

我在这里使用了N = 4

I have used here N = 4,

我使用了下面的代码,但是没有有效的窗口化功能.

I have used the below code but not effienntly windowing function is there any optimized way.

DROP TEMPORARY TABLE t2;
CREATE TEMPORARY TABLE t2
SELECT *,
MAX(conditions) OVER (PARTITION BY item ORDER BY item,`date` ROWS 4 PRECEDING ) AS new_row
FROM record
ORDER BY item,`date`;

 

 DROP TEMPORARY TABLE t3;
CREATE TEMPORARY TABLE t3
SELECT *,ROW_NUMBER() OVER (PARTITION BY item,new_row ORDER BY item,`date`) AS e FROM t2;

 


SELECT *,CASE WHEN new_row=1 AND e%5>1 THEN 0 
WHEN new_row=1 AND e%5=1 THEN 1 ELSE 0 END AS flag FROM t3;

输出像

+----------+----+-------+-----------------------+-----+
|      date|item|avg_val|conditions             |flag |
+----------+----+-------+-----------------------+-----+
|01-10-2020|   x|     10|                      0|    0|
|02-10-2020|   x|     10|                      0|    0|
|03-10-2020|   x|     15|                      1|    1|
|04-10-2020|   x|     15|                      1|    0|
|05-10-2020|   x|      5|                      0|    0|
|06-10-2020|   x|     13|                      1|    0|
|07-10-2020|   x|     10|                      1|    0|
|08-10-2020|   x|     10|                      0|    0|
|09-10-2020|   x|     15|                      1|    1|
|01-10-2020|   y|     10|                      0|    0|
|02-10-2020|   y|     18|                      0|    0|
|03-10-2020|   y|      6|                      1|    1|
|04-10-2020|   y|     10|                      0|    0|
|05-10-2020|   y|     20|                      0|    0|
+----------+----+-------+-----------------------+-----+

但是我无法获得输出,我已经尝试了更多.

But i am unable to get the ouput , i have tried more.

推荐答案

如注释中所建议(@nbk和@Akina),您将需要某种迭代器来实现逻辑.在SparkSQL和Spark版本2.4+中,我们可以使用内置函数

As suggested in the comments(by @nbk and @Akina), you will need some sort of iterator to implement the logic. With SparkSQL and Spark version 2.4+, we can use the builtin function aggregate and set an array of structs plus a counter as the accumulator. Below is an example dataframe and table named record(assume values in conditions column are either 0 or 1):

val df = Seq(
    ("01-10-2020", "x", 10, 0), ("02-10-2020", "x", 10, 0), ("03-10-2020", "x", 15, 1),
    ("04-10-2020", "x", 15, 1), ("05-10-2020", "x", 5, 0), ("06-10-2020", "x", 13, 1),
    ("07-10-2020", "x", 10, 1), ("08-10-2020", "x", 10, 0), ("09-10-2020", "x", 15, 1),
    ("01-10-2020", "y", 10, 0), ("02-10-2020", "y", 18, 0), ("03-10-2020", "y", 6, 1),
    ("04-10-2020", "y", 10, 0), ("05-10-2020", "y", 20, 0)
).toDF("date", "item", "avg_val", "conditions")

df.createOrReplaceTempView("record")

SQL:

spark.sql("""
  SELECT t1.item, m.*
  FROM (
    SELECT item,
      sort_array(collect_list(struct(date,avg_val,int(conditions) as conditions,conditions as flag))) as dta
    FROM record
    GROUP BY item
  ) as t1 LATERAL VIEW OUTER inline(
    aggregate(
      /* expr: set up array `dta` from the 2nd element to the last 
       *       notice that indices for slice function is 1-based, dta[i] is 0-based
       */
      slice(dta,2,size(dta)),
      /* start: set up and initialize `acc` to a struct containing two fields:
       * - dta: an array of structs with a single element dta[0]
       * - counter: number of rows after flag=1, can be from `0` to `N+1`
       */
      (array(dta[0]) as dta, dta[0].conditions as counter),
      /* merge: iterate through the `expr` using x and update two fields of `acc`
       * - dta: append values from x to acc.dta array using concat + array functions
       *        update flag using `IF(acc.counter IN (0,5) and x.conditions = 1, 1, 0)`
       * - counter: increment by 1 if acc.counter is between 1 and 4
       *            , otherwise set value to x.conditions
       */
      (acc, x) -> named_struct(
          'dta', concat(acc.dta, array(named_struct(
              'date', x.date,
              'avg_val', x.avg_val,
              'conditions', x.conditions,
              'flag', IF(acc.counter IN (0,5) and x.conditions = 1, 1, 0)
            ))),
          'counter', IF(acc.counter > 0 and acc.counter < 5, acc.counter+1, x.conditions)
        ),
      /* finish: retrieve acc.dta only and discard acc.counter */
      acc -> acc.dta
    )
  ) m
""").show(50)

结果:

+----+----------+-------+----------+----+
|item|      date|avg_val|conditions|flag|
+----+----------+-------+----------+----+
|   x|01-10-2020|     10|         0|   0|
|   x|02-10-2020|     10|         0|   0|
|   x|03-10-2020|     15|         1|   1|
|   x|04-10-2020|     15|         1|   0|
|   x|05-10-2020|      5|         0|   0|
|   x|06-10-2020|     13|         1|   0|
|   x|07-10-2020|     10|         1|   0|
|   x|08-10-2020|     10|         0|   0|
|   x|09-10-2020|     15|         1|   1|
|   y|01-10-2020|     10|         0|   0|
|   y|02-10-2020|     18|         0|   0|
|   y|03-10-2020|      6|         1|   1|
|   y|04-10-2020|     10|         0|   0|
|   y|05-10-2020|     20|         0|   0|
+----+----------+-------+----------+----+

位置:

  1. 使用 groupby 将同一项目的行收集到名为 dta 的结构数组中,该结构包含4个字段: date avg_val 条件标志,并按日期
  2. 进行排序
  3. 使用 aggregate 函数迭代上述结构数组,根据 counter 条件更新 flag 字段(详细信息请参见上面的SQL代码注释)
  4. 使用 Lateral VIEW
  1. use groupby to collect rows for the same item into an array of structs named dta column with 4 fields: date, avg_val, conditions and flag and sorted by date
  2. use aggregate function to iterate through the above array of structs, update the flag field based on counter and conditions (details see the above SQL code comments)
  3. use Lateral VIEW and inline function to explode the resulting array of structs from the aggregate function

注释:

(1)建议的SQL适用于N = 4,其中我们有 acc.counter IN(0,5) acc.counter<5 在SQL中.对于任何N,将上述值调整为: acc.counter IN(0,N + 1) acc.counter<N + 1 ,下面显示具有相同样本数据的 N = 2 的结果:

(1) the proposed SQL is for N=4, where we have acc.counter IN (0,5) and acc.counter < 5 in the SQL. For any N, adjust the above to: acc.counter IN (0,N+1) and acc.counter < N+1, the below shows the result for N=2 with the same sample data:

+----+----------+-------+----------+----+
|item|      date|avg_val|conditions|flag|
+----+----------+-------+----------+----+
|   x|01-10-2020|     10|         0|   0|
|   x|02-10-2020|     10|         0|   0|
|   x|03-10-2020|     15|         1|   1|
|   x|04-10-2020|     15|         1|   0|
|   x|05-10-2020|      5|         0|   0|
|   x|06-10-2020|     13|         1|   1|
|   x|07-10-2020|     10|         1|   0|
|   x|08-10-2020|     10|         0|   0|
|   x|09-10-2020|     15|         1|   1|
|   y|01-10-2020|     10|         0|   0|
|   y|02-10-2020|     18|         0|   0|
|   y|03-10-2020|      6|         1|   1|
|   y|04-10-2020|     10|         0|   0|
|   y|05-10-2020|     20|         0|   0|
+----+----------+-------+----------+----+

(2)我们使用 dta [0] 初始化 acc ,其中包括其字段的值和数据类型.理想情况下,我们应确保这些字段的数据类型正确,以便正确进行所有计算.例如,在计算 acc.counter 时,如果 conditions 为StringType,则 acc.counter + 1 将返回具有DoubleType值的StringType

(2) we use dta[0] to initialize acc which includes both the values and datatypes of its fields. Ideally, we should make sure data types of these fields right so that all calculations are correctly conducted. for example when calculating acc.counter, if conditions is StringType, acc.counter+1 will return a StringType with a DoubleType value

spark.sql("select '2'+1").show()
+---------------------------------------+
|(CAST(2 AS DOUBLE) + CAST(1 AS DOUBLE))|
+---------------------------------------+
|                                    3.0|
+---------------------------------------+

使用 acc.counter IN(0,5) acc.counter<5 .根据OP的反馈,这会产生不正确的结果,而没有任何警告/错误消息.

Which could yield floating-point errors when comparing their value with integers using acc.counter IN (0,5) or acc.counter < 5. Based on OP's feedback, this produced incorrect result without any WARNING/ERROR message.

  • 一种解决方法是在设置聚合函数的第二个参数时使用CAST指定确切的字段类型,以便在任何类型不匹配时报告错误,请参见以下内容:

  • One workaround is to specify exact field types using CAST when setting up the 2nd argument of aggregate function so it reports ERROR when any types mismatch, see below:

CAST((array(dta[0]), dta[0].conditions) as struct<dta:array<struct<date:string,avg_val:string,conditions:int,flag:int>>,counter:int>),

  • 在创建 dta 列时强制输入类型的另一种解决方案,在此示例中,请参见以下代码中的 int(conditions)作为条件:

  • Another solution it to force types when creating dta column, in this example, see int(conditions) as conditions in below code:

    SELECT item,
      sort_array(collect_list(struct(date,avg_val,int(conditions) as conditions,conditions as flag))) as dta
    FROM record
    GROUP BY item
    

  • 我们还可以在计算内部强制使用数据类型,例如,请参见下面的 int(acc.counter + 1):

    IF(acc.counter > 0 and acc.counter < 5, int(acc.counter+1), x.conditions)      
    

  • 这篇关于如何有效地使用开窗函数基于N个先前值确定下N个行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆