当条件为true时应用Scala窗口函数,否则用上一个值填充 [英] Apply Scala window function when condition is true else fill with last value
问题描述
给出一组用于各种电子邮件ID的交易.例如:
Given a set of transactions for various email ids. For example:
val df = Seq(
("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false),
("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true),
("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true),
("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false),
("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true)
).toDF("email", "timestamp", "transaction_id", "condition")
我希望获得的是过去24小时内 condition
为true的按电子邮件分组的交易计数.如果 condition
为false,我只希望 count
列包含 condition
为true的最后一个有效计数.对于上面,这是结果:
What I am looking to get is the count of transactions grouped by email within the last 24 hours for which condition
is true. If condition
is false, I just want the count
column to contain the last good count for which condition
was true. For above, here is the result:
val expectedDF = Seq(
("a@gmail.com", "2020-10-01 01:04:00", "txid-0", false, 0),
("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
("a@gmail.com", "2020-10-02 02:04:00", "txid-2", false, 1),// copy last count since condition is false
("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
("a@gmail.com", "2020-10-02 04:04:00", "txid-4", false, 2),// copy last count since condition is false
("a@gmail.com", "2020-10-02 04:05:00", "txid-5", false, 2),// copy last count since condition is false
("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1), // beyond 24 hrs from prev transaction
("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1), // new email
("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1) // new email
).toDF("email", "timestamp", "transaction_id", "condition", "count")
到目前为止,我所做的是:
What I did so far is:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-24*3600, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
resultDF.show()
以下显示的内容是没有条件为 condition
==的行,但我希望所有行都具有正确的计数值,例如 expectedDF
中的行:
What this prints is the following without the rows with condition
== false conditions but I want all the rows instead with proper count values like in expectedDF
:
("email", | "timestamp" | "transaction_id" | "condition" | "count")
("a@gmail.com", "2020-10-02 01:04:00", "txid-1", true, 1),
("a@gmail.com", "2020-10-02 03:04:00", "txid-3", true, 2),
("a@gmail.com", "2020-10-02 05:04:00", "txid-6", true, 3),
("a@gmail.com", "2020-10-05 12:04:00", "txid-7", true, 1),
("b@gmail.com", "2020-12-03 03:04:00", "txid-8", true, 1),
("c@gmail.com", "2020-12-04 06:04:00", "txid-9", true, 1)
我找不到以仅在条件为true时才进行评估的方式应用窗口函数的方法,否则在条件为true时才复制最后的良好值.任何帮助将不胜感激.
I am not able find a way to apply window function in a way that it only evaluates when condition is true else copies last good value when condition was last true. Any help will be appreciated.
推荐答案
不过滤,而仅通过使用 when
来使用条件表达式.
Do not filter but just use the conditional expression by using when
.
val resultDF = new_df
.withColumn("count", count(when(col("condition"), col("email"))).over(winSpec))
resultDF.show()
+-----------+-------------------+--------------+---------+---------------------+-----+
| email| timestamp|transaction_id|condition|transaction_timestamp|count|
+-----------+-------------------+--------------+---------+---------------------+-----+
|a@gmail.com|2020-10-01 01:04:00| txid-0| false| 1.60151424E9| 0|
|a@gmail.com|2020-10-02 01:04:00| txid-1| true| 1.60160064E9| 1|
|a@gmail.com|2020-10-02 02:04:00| txid-2| false| 1.60160424E9| 1|
|a@gmail.com|2020-10-02 03:04:00| txid-3| true| 1.60160784E9| 2|
|a@gmail.com|2020-10-02 04:04:00| txid-4| false| 1.60161144E9| 2|
|a@gmail.com|2020-10-02 04:05:00| txid-5| false| 1.6016115E9| 2|
|a@gmail.com|2020-10-02 05:04:00| txid-6| true| 1.60161504E9| 3|
|a@gmail.com|2020-10-05 12:04:00| txid-7| true| 1.60189944E9| 1|
|c@gmail.com|2020-12-04 06:04:00| txid-9| true| 1.60706184E9| 1|
|b@gmail.com|2020-12-03 03:04:00| txid-8| true| 1.60696464E9| 1|
+-----------+-------------------+--------------+---------+---------------------+-----+
这篇关于当条件为true时应用Scala窗口函数,否则用上一个值填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!