带当前行条件的 Spark 窗口函数 [英] Spark window function with condition on current row

查看:35
本文介绍了带当前行条件的 Spark 窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试为给定的 order_id 计算过去 365 天内有多少订单已付款.这不是问题:我使用 窗口函数.

I am trying to count for a given order_id how many orders there were in the past 365 days which had a payment. And this is not the problem: I use the window function.

对我来说棘手的地方是:我不想计算当前 order_id 的 order_date 之后 payment_date 的这个时间窗口中的订单.

Where it gets tricky for me is: I don't want to count orders in this time window where the payment_date is after order_date of the current order_id.

目前,我有这样的事情:

Currently, I have something like this:

val window: WindowSpec = Window
  .partitionBy("customer_id")
  .orderBy("order_date")
  .rangeBetween(-365*days, -1)

df.withColumn("paid_order_count", count("*") over window)

这将计算客户当前订单前过去 365 天内的所有订单.

which would count all orders for the customer within the last 365 days before his current order.

我现在如何将计算条件纳入考虑当前订单的order_date?

How can I now incorporate a condition for the counting that takes the order_date of the current order into account?

示例:

+---------+-----------+-------------+------------+
|order_id |order_date |payment_date |customer_id |
+---------+-----------+-------------+------------+
|1        |2017-01-01 |2017-01-10   |A           |
|2        |2017-02-01 |2017-02-10   |A           |
|3        |2017-02-02 |2017-02-20   |A           |

结果表应如下所示:

+---------+-----------+-------------+------------+-----------------+
|order_id |order_date |payment_date |customer_id |paid_order_count |
+---------+-----------+-------------+------------+-----------------+
|1        |2017-01-01 |2017-01-10   |A           |0                |
|2        |2017-02-01 |2017-02-10   |A           |1                |
|3        |2017-02-02 |2017-02-20   |A           |1                |

对于 order_id = 3paid_order_count 不应该是 2 而是 1 作为 order_id = 2order_id = 3下单后支付.

For order_id = 3 the paid_order_count should not be 2 but 1 as order_id = 2 is paid after order_id = 3 is placed.

我希望我能很好地解释我的问题,期待你的想法!

I hope that I explained my problem well and look forward to your ideas!

推荐答案

很好的问题!!!有几点说明,使用 rangeBetween 创建一个固定框架,该框架基于其中的行数而不是值,因此在两种情况下会出现问题:

Very good question!!! A couple of remarks, using rangeBetween creates a fixed frame that is based on number of rows in it and not on values, so it will be problematic in 2 cases:

  1. 客户并非每天都有订单,因此 365 行窗口可能包含一年前带有 order_date 的行
  2. 如果客户每天有一个以上的订单,一年的保障就会混乱
  3. 1 和 2 的组合

此外,rangeBetween 不适用于日期和时间戳数据类型.

Also rangeBetween does not work with Date and Timestamp datatypes.

为了解决这个问题,可以使用带有列表和 UDF 的窗口函数:

To solve it, it is possible to use window function with lists and an UDF:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

  val df = spark.sparkContext.parallelize(Seq(
    (1, "2017-01-01", "2017-01-10", "A")
    , (2, "2017-02-01", "2017-02-10", "A")
    , (3, "2017-02-02", "2017-02-20", "A")
  )
  ).toDF("order_id", "order_date", "payment_date", "customer_id")
    .withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
    .withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))

//      df.printSchema()
//      df.show(false)

  val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)

  val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
      val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
      array.count(v => v >= bottom && v < top)
    }
  )

  val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
      .withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))

  res.show(false)

输出:

+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders             |paid_order_count|
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|1       |2017-01-01|2017-01-10  |A          |1483228800   |1484006400     |[]                      |0               |
|2       |2017-02-01|2017-02-10  |A          |1485907200   |1486684800     |[1484006400]            |1               |
|3       |2017-02-02|2017-02-20  |A          |1485993600   |1487548800     |[1484006400, 1486684800]|1               |
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+

以秒为单位将日期转换为 Spark 时间戳可以提高列表的内存效率.

Converting dates to Spark timestamps in seconds makes the lists more memory efficient.

这是最容易实现的代码,但不是最优化的,因为列表会占用一些内存,自定义 UDAF 会是最好的,但需要更多的编码,以后可能会做.如果您的每位客户有数千个订单,这仍然有效.

It is the easiest code to implement, but not the most optimal as the lists will take up some memory, custom UDAF would be best, but requires more coding, might do later. This will still work if you have thousands of orders per customer.

这篇关于带当前行条件的 Spark 窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆