Dataframe上的Spark-SQL窗口函数-查找组中的第一个时间戳 [英] Spark-SQL Window functions on Dataframe - Finding first timestamp in a group

查看:326
本文介绍了Dataframe上的Spark-SQL窗口函数-查找组中的第一个时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有下面的数据框(例如UserData).

I have below dataframe (say UserData).

uid region  timestamp
a   1   1
a   1   2
a   1   3
a   1   4
a   2   5
a   2   6
a   2   7
a   3   8
a   4   9
a   4   10
a   4   11
a   4   12
a   1   13
a   1   14
a   3   15
a   3   16
a   5   17
a   5   18
a   5   19
a   5   20

该数据只不过是用户(uid)在不同时间(时间戳)跨不同区域(region)旅行.目前,为简单起见,时间戳显示为"int".请注意,上述数据帧不一定按时间戳顺序递增.另外,不同用户之间可能会有一些行.为了简单起见,我仅以时间戳的单调递增顺序显示了单个用户的数据帧.

This data is nothing but user (uid) travelling across different regions (region) at different time (timestamp). Presently, timestamp is shown as 'int' for simplicity. Note that above dataframe will not be necessarily in increasing order of timestamp. Also, there may be some rows in between from different users. I have shown dataframe for single user only in monotonically incrementing order of timestamp for simplicity.

我的目标是-查找用户"a"在每个区域以什么顺序花费了多少时间?所以我的最终预期输出看起来像

My goal is - to find User 'a' spent how much time in each region and in what order? So My final expected output looks like

uid region  regionTimeStart regionTimeEnd
a   1   1   5
a   2   5   8
a   3   8   9
a   4   9   13
a   1   13  15
a   3   15  17
a   5   17  20

根据我的发现,Spark SQL Window函数可用于此目的. 我在下面尝试过,

Based on my findings, Spark SQL Window functions can be used for this purpose. I have tried below things,

val w = Window
  .partitionBy("region")
  .partitionBy("uid")
  .orderBy("timestamp")

val resultDF = UserData.select(
  UserData("uid"), UserData("timestamp"),
  UserData("region"), rank().over(w).as("Rank"))

但是从现在开始,我不确定如何获取regionTimeStartregionTimeEnd列.除了组中的最后一个条目,regionTimeEnd列仅是regionTimeStart的"lead".

But here onwards, I am not sure on how to get regionTimeStart and regionTimeEnd columns. regionTimeEnd column is nothing but 'lead' of regionTimeStart except the last entry in group.

我看到聚合操作具有第一个"和最后一个"功能,但是为此,我需要基于("uid","region")对数据进行分组,这会破坏遍历路径的单调递增顺序,即在用户13,14时已经回到了区域"1",我希望保留该位置,而不是将其与时间1的初始区域"1"合并.

I see Aggregate operations have 'first' and 'last' functions but for that I need to group data based on ('uid','region') which spoils monotonically increasing order of path traversed i.e. at time 13,14 user has come back to region '1' and I want that to be retained instead of clubbing it with initial region '1' at time 1.

如果有人可以指导我,这将非常有帮助.我是Spark的新手,与Python/JAVA Spark API相比,我对Scala Spark API有更好的了解.

It would be very helpful if anyone one can guide me. I am new to Spark and I have better understanding of Scala Spark APIs compared to Python/JAVA Spark APIs.

推荐答案

窗口函数确实有用,尽管您的方法仅在假定用户仅访问给定区域一次的情况下才有效.同样,您使用的窗口定义也不正确-多次调用partitionBy只是返回具有不同窗口定义的新对象.如果要按多列进行分区,则应在一次调用(.partitionBy("region", "uid"))中传递它们.

Window functions are indeed useful although your approach can work only if you assume that user visits given region only once. Also window definition you use is incorrect - multiple calls to partitionBy simply return new objects with different window definitions. If you want to partition by multiple columns you should pass them in a single call (.partitionBy("region", "uid")).

让我们从标记每个区域的连续访问开始:

Lets start with marking continuous visits in each region:

import org.apache.spark.sql.functions.{lag, sum, not}
import org.apache.spark.sql.expressions.Window 

val w = Window.partitionBy($"uid").orderBy($"timestamp")

val change = (not(lag($"region", 1).over(w) <=> $"region")).cast("int")
val ind = sum(change).over(w)

val dfWithInd = df.withColumn("ind", ind)

接下来,我们只需要汇总各个组并找到潜在客户:

Next you we simply aggregate over the groups and find leads:

import org.apache.spark.sql.functions.{lead, coalesce}

val regionTimeEnd = coalesce(lead($"timestamp", 1).over(w), $"max_")

val result = dfWithInd
  .groupBy($"uid", $"region", $"ind")
  .agg(min($"timestamp").alias("timestamp"), max($"timestamp").alias("max_"))
  .drop("ind")
  .withColumn("regionTimeEnd", regionTimeEnd)
  .withColumnRenamed("timestamp", "regionTimeStart")
  .drop("max_")

result.show

// +---+------+---------------+-------------+
// |uid|region|regionTimeStart|regionTimeEnd|
// +---+------+---------------+-------------+
// |  a|     1|              1|            5|
// |  a|     2|              5|            8|
// |  a|     3|              8|            9|
// |  a|     4|              9|           13|
// |  a|     1|             13|           15|
// |  a|     3|             15|           17|
// |  a|     5|             17|           20|
// +---+------+---------------+-------------+

这篇关于Dataframe上的Spark-SQL窗口函数-查找组中的第一个时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆