条件复杂的Spark SQL窗口函数 [英] Spark SQL window function with complex condition

查看:22
本文介绍了条件复杂的Spark SQL窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这可能最容易通过示例来解释.假设我有一个用户登录网站的 DataFrame,例如:

This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

我想在此添加一列,表明他们何时成为网站上的活跃用户.但有一个警告:在一段时间内,用户被认为是活跃的,在这段时间之后,如果他们再次登录,他们的 became_active 日期会重置.假设这段时间是5 天.那么从上表派生的所需表将是这样的:

I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

因此,特别是,SirChillingtonIV 的 became_active 日期被重置,因为他们的第二次登录是在活动期到期后进行的,但是 Booooooo99900098 的 became_active 日期没有在他/她登录了,因为它属于活跃期.

So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.

我最初的想法是使用带有 lag 的窗口函数,然后使用 lagged 值来填充 became_active 列;例如,开头大致如下:

My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

然后,如果 tmpnull(即,如果它是第一次登录) 或者如果 login_date - tmp >= 5 那么 became_active = login_date;否则,转到 tmp 中的下一个最新值并应用相同的规则.这表明了一种递归方法,我无法想象实现的方法.

Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.

我的问题:这是一种可行的方法吗?如果可行,我怎样才能返回"并查看 tmp 的早期值,直到找到我停下来的地方?据我所知,我无法遍历 Spark SQL Column 的值.有没有其他方法可以达到这个结果?

My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?

推荐答案

Spark >= 3.2

最近的 Spark 版本为批处理和结构化流查询中的会话窗口提供本机支持(请参阅 SPARK-10816 及其子任务,尤其是 SPARK-34893).

Recent Spark releases provide native support for session windows in both batch and structured streaming queries (see SPARK-10816 and its sub-tasks, especially SPARK-34893).

官方文档提供了不错的使用示例.

The official documentation provides nice usage example.

火花<3.2

这是诀窍.导入一堆函数:

Here is the trick. Import a bunch of functions:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

定义窗口:

val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")

找到新会话开始的点:

val newSession =  (coalesce(
  datediff($"login_date", lag($"login_date", 1).over(userWindow)),
  lit(0)
) > 5).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

查找每个会话的最早日期:

Find the earliest date per session:

val result = sessionized
  .withColumn("became_active", min($"login_date").over(userSessionWindow))
  .drop("session")

数据集定义为:

val df = Seq(
  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
  ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
  ("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")

结果是:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14|   2012-01-11| 
|SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+

这篇关于条件复杂的Spark SQL窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆