当条件为true时应用Scala窗口函数,否则用上一个值填充 [英] Apply Scala window function when condition is true else fill with last value

查看:54
本文介绍了当条件为true时应用Scala窗口函数,否则用上一个值填充的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给出一组用于各种电子邮件ID的交易.例如:

Given a set of transactions for various email ids. For example:

  val df = Seq(
      ("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false),
      ("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true),
      ("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false),
      ("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true),
      ("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false),
      ("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false),
      ("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true),
      ("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true),
      ("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true),
      ("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true)
    ).toDF("email", "timestamp", "transaction_id", "condition")

我希望获得的是过去24小时内 condition 为true的按电子邮件分组的交易计数.如果 condition 为false,我只希望 count 列包含 condition 为true的最后一个有效计数.对于上面,这是结果:

What I am looking to get is the count of transactions grouped by email within the last 24 hours for which condition is true. If condition is false, I just want the count column to contain the last good count for which condition was true. For above, here is the result:

val expectedDF = Seq(
  ("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false, 0),
  ("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
  ("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false, 1),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
  ("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false, 2),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false, 2),// copy last count since condition is false
  ("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
  ("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1), // beyond 24 hrs from prev transaction
  ("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1), // new email
  ("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1) // new email
).toDF("email", "timestamp", "transaction_id", "condition", "count")

到目前为止,我所做的是:

What I did so far is:

    val new_df = df
      .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

    val winSpec = Window
      .partitionBy("email")
      .orderBy(col("transaction_timestamp"))
      .rangeBetween(-24*3600, Window.currentRow)

    val resultDF = new_df
      .filter(col("condition"))
      .withColumn("count", count(col("email")).over(winSpec))

    resultDF.show()

以下显示的内容是没有条件为 condition ==的行,但我希望所有行都具有正确的计数值,例如 expectedDF 中的行:

What this prints is the following without the rows with condition== false conditions but I want all the rows instead with proper count values like in expectedDF:

("email",      | "timestamp"         | "transaction_id" | "condition" | "count")
("a@gmail.com", "2020-10-02 01:04:00", "txid-1",           true,            1),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3",           true,            2),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6",           true,            3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7",           true,            1),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8",           true,            1),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9",           true,            1)

我找不到以仅在条件为true时才进行评估的方式应用窗口函数的方法,否则在条件为true时才复制最后的良好值.任何帮助将不胜感激.

I am not able find a way to apply window function in a way that it only evaluates when condition is true else copies last good value when condition was last true. Any help will be appreciated.

推荐答案

不过滤,而仅通过使用 when 来使用条件表达式.

Do not filter but just use the conditional expression by using when.

val resultDF = new_df
  .withColumn("count", count(when(col("condition"), col("email"))).over(winSpec))

resultDF.show()

+-----------+-------------------+--------------+---------+---------------------+-----+
|      email|          timestamp|transaction_id|condition|transaction_timestamp|count|
+-----------+-------------------+--------------+---------+---------------------+-----+
|a@gmail.com|2020-10-01 01:04:00|        txid-0|    false|         1.60151424E9|    0|
|a@gmail.com|2020-10-02 01:04:00|        txid-1|     true|         1.60160064E9|    1|
|a@gmail.com|2020-10-02 02:04:00|        txid-2|    false|         1.60160424E9|    1|
|a@gmail.com|2020-10-02 03:04:00|        txid-3|     true|         1.60160784E9|    2|
|a@gmail.com|2020-10-02 04:04:00|        txid-4|    false|         1.60161144E9|    2|
|a@gmail.com|2020-10-02 04:05:00|        txid-5|    false|          1.6016115E9|    2|
|a@gmail.com|2020-10-02 05:04:00|        txid-6|     true|         1.60161504E9|    3|
|a@gmail.com|2020-10-05 12:04:00|        txid-7|     true|         1.60189944E9|    1|
|c@gmail.com|2020-12-04 06:04:00|        txid-9|     true|         1.60706184E9|    1|
|b@gmail.com|2020-12-03 03:04:00|        txid-8|     true|         1.60696464E9|    1|
+-----------+-------------------+--------------+---------+---------------------+-----+

这篇关于当条件为true时应用Scala窗口函数,否则用上一个值填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆