当条件为真时应用 Scala 窗口函数,否则用最后一个值填充 [英] Apply Scala window function when condition is true else fill with last value

查看:21
本文介绍了当条件为真时应用 Scala 窗口函数,否则用最后一个值填充的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定一组针对各种电子邮件 ID 的交易.例如:

Given a set of transactions for various email ids. For example:

  val df = Seq(
      ("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false),
      ("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true),
      ("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false),
      ("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true),
      ("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false),
      ("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false),
      ("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true),
      ("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true),
      ("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true),
      ("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true)
    ).toDF("email", "timestamp", "transaction_id", "condition")

我希望得到的是过去 24 小时内按电子邮件分组且 condition 为真的交易计数.如果 condition 为假,我只希望 count 列包含 condition 为真的最后一个好的计数.对于以上,这是结果:

What I am looking to get is the count of transactions grouped by email within the last 24 hours for which condition is true. If condition is false, I just want the count column to contain the last good count for which condition was true. For above, here is the result:

val expectedDF = Seq(
  ("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false, 0),
  ("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
  ("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false, 1),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
  ("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false, 2),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false, 2),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
  ("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1), // beyond 24 hrs from prev transaction
  ("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1), // new email
  ("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1) // new email
).toDF("email", "timestamp", "transaction_id", "condition", "count")

到目前为止我所做的是:

What I did so far is:

    val new_df = df
      .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

    val winSpec = Window
      .partitionBy("email")
      .orderBy(col("transaction_timestamp"))
      .rangeBetween(-24*3600, Window.currentRow)

    val resultDF = new_df
      .filter(col("condition"))
      .withColumn("count", count(col("email")).over(winSpec))

    resultDF.show()

打印的内容如下,没有带有 condition== false 条件的行,但我希望所有行都具有适当的计数值,例如 expectedDF:

What this prints is the following without the rows with condition== false conditions but I want all the rows instead with proper count values like in expectedDF:

("email",      | "timestamp"         | "transaction_id" | "condition" | "count")
("a@gmail.com", "2020-10-02 01:04:00", "txid-1",           true,            1),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3",           true,            2),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6",           true,            3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7",           true,            1),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8",           true,            1),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9",           true,            1)

我无法找到一种方法来应用窗口函数,它仅在条件为真时进行评估,否则在条件最后为真时复制最后一个好的值.任何帮助将不胜感激.

I am not able find a way to apply window function in a way that it only evaluates when condition is true else copies last good value when condition was last true. Any help will be appreciated.

推荐答案

不过滤,只使用 when 的条件表达式.

Do not filter but just use the conditional expression by using when.

val resultDF = new_df
  .withColumn("count", count(when(col("condition"), col("email"))).over(winSpec))

resultDF.show()

+-----------+-------------------+--------------+---------+---------------------+-----+
|      email|          timestamp|transaction_id|condition|transaction_timestamp|count|
+-----------+-------------------+--------------+---------+---------------------+-----+
|a@gmail.com|2020-10-01 01:04:00|        txid-0|    false|         1.60151424E9|    0|
|a@gmail.com|2020-10-02 01:04:00|        txid-1|     true|         1.60160064E9|    1|
|a@gmail.com|2020-10-02 02:04:00|        txid-2|    false|         1.60160424E9|    1|
|a@gmail.com|2020-10-02 03:04:00|        txid-3|     true|         1.60160784E9|    2|
|a@gmail.com|2020-10-02 04:04:00|        txid-4|    false|         1.60161144E9|    2|
|a@gmail.com|2020-10-02 04:05:00|        txid-5|    false|          1.6016115E9|    2|
|a@gmail.com|2020-10-02 05:04:00|        txid-6|     true|         1.60161504E9|    3|
|a@gmail.com|2020-10-05 12:04:00|        txid-7|     true|         1.60189944E9|    1|
|c@gmail.com|2020-12-04 06:04:00|        txid-9|     true|         1.60706184E9|    1|
|b@gmail.com|2020-12-03 03:04:00|        txid-8|     true|         1.60696464E9|    1|
+-----------+-------------------+--------------+---------+---------------------+-----+

这篇关于当条件为真时应用 Scala 窗口函数,否则用最后一个值填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆