如何在时间戳值上使用 lag 和 rangeBetween 函数? [英] How to use lag and rangeBetween functions on timestamp values?

查看:27
本文介绍了如何在时间戳值上使用 lag 和 rangeBetween 函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这样的数据:

userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769

如果在同一 location_point 的 5 分钟窗口内有 2 个或更多userid,我想添加一个新的布尔列,该列标记为 true.我有一个想法,使用 lag 函数来查找由 userid 划分的窗口以及当前时间戳和接下来 5 分钟之间的范围:

I would like to add a new boolean column that marks true if there are 2 or moreuserid within a 5 minutes window in the same location_point. I had an idea of using lag function to lookup over a window partitioned by the userid and with the range between the current timestamp and the next 5 minutes:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

days = lambda i: i * 60*5 

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))

lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")

当我使用 RangeBetween 函数时,这段代码给了我一个分析异常:

This code is giving me an analysis exception when I use the RangeBetween function:

AnalysisException:当前行和 1500 之间的 u'Window Frame RANGEFOLLOWING 必须匹配所需的帧 ROWS BETWEEN 1 PRECEDING AND 1前面;

AnalysisException: u'Window Frame RANGE BETWEEN CURRENT ROW AND 1500 FOLLOWING must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;

你知道解决这个问题的方法吗?

Do you know any way to tackle this problem?

推荐答案

鉴于您的数据:

让我们添加一个以秒为单位的时间戳列:

Let's add a column with a timestamp in seconds:

df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long"))
df.show()

+--------+-------------------+--------------+----------+
|  userid|          eventtime|location_point| timestamp|  
+--------+-------------------+--------------+----------+
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|
+--------+-------------------+--------------+----------+  

现在,让我们定义一个窗口函数,按位置点划分,按时间戳排序,范围在 -300 秒和当前时间之间.我们可以计算这个窗口中元素的数量,并将这些数据放在名为occurences in_5_min"的列中:

Now, let's define a window function, with a partition by location_point, an order by timestamp and a range between -300s and current time. We can count the number of elements in this window and put these data in a column named 'occurences in_5_min':

w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0)
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w))
df.show()

+--------+-------------------+--------------+----------+--------------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|
+--------+-------------------+--------------+----------+--------------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|
+--------+-------------------+--------------+----------+--------------------+

现在,如果特定位置在过去 5 分钟内出现的次数严格超过 1,您可以使用 True 添加所需的列:

Now you can add the desired column with True if the number of occurences is strictly more than 1 in the last 5 minutes on a particular location:

add_bool = udf(lambda col : True if col>1 else False, BooleanType())
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df.show()

+--------+-------------------+--------------+----------+--------------------+---------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|already_occured|
+--------+-------------------+--------------+----------+--------------------+---------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|          false|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|          false|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|          false|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|          false|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|           true|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|          false|
+--------+-------------------+--------------+----------+--------------------+---------------+

这篇关于如何在时间戳值上使用 lag 和 rangeBetween 函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆