首次登录后24小时内,火花标记重复的用户登录 [英] Spark marking duplicate user login within 24 hour after first login
问题描述
我有一个包含用户和登录时间的数据集.如果首次登录后24小时内有/其他登录,我需要标记为重复.活动窗口随用户登录一起打开.例如,这是示例数据集
i have a dataset with users and login time. I need to mark duplicate if there is/additional logins within 24 hour period AFTER First login. Activity window opens with user login. For example, here is sample data set
user login
-----------------------------
user1 12/1/19 8:00
user1 12/1/19 10:00
user1 12/1/19 23:00
user1 12/2/19 7:00
user1 12/2/19 8:00
user1 12/2/19 10:00
user1 12/3/19 23:00
user1 12/4/19 7:00
user2 12/4/19 8:00
user2 12/5/19 5:00
user2 12/6/19 0:00
预期结果
user login Duplicate
---------------------------------
user1 12/1/19 8:00 N this is first login for user1 - 24 hour window opens here
user1 12/1/19 10:00 Y since this is within 24 hours
user1 12/1/19 23:00 Y
user1 12/2/19 7:00 Y
user1 12/2/19 8:00 Y
user1 12/2/19 10:00 N This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
user1 12/3/19 23:00 N
user1 12/4/19 7:00 Y
user2 12/4/19 8:00 N
user2 12/5/19 5:00 Y
user2 12/6/19 0:00 N
我查看了具有复杂条件的Spark SQL窗口函数,但这如果用户每隔固定的时间登录一次,则该解决方案将无法正常工作.
I looked Spark SQL window function with complex condition but this solution won't work if user logins are at fixed interval let's say every 18 hours.
这是另一个示例(如果解决方案仅考虑第一个活动来计算24小时窗口,则它将为下面的记录7提供不正确的结果(非重复))
Here is another example (if solution considers only first activity to calculate 24 hr window, it will give incorrect result(Non-duplicate) for record # 7 below)
user1 12/1/19 8:00 N this is first login for user1 - 24 hour window opens here
user1 12/1/19 10:00 Y since this is within 24 hours
user1 12/1/19 23:00 Y
user1 12/2/19 7:00 Y
user1 12/2/19 8:00 Y
user1 12/2/19 10:00 N This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
**user1 12/3/19 09:00 N**
user1 12/3/19 23:00 N
user1 12/4/19 7:00 Y
user2 12/4/19 8:00 N
user2 12/5/19 5:00 Y
user2 12/6/19 0:00 N
推荐答案
我不知道任何内置的Spark函数可以基于以下命令连续识别下一个24小时会话(或任何给定时间段)的开始上一个会话以动态方式结束的位置.处理此类要求的一种方法是通过利用Scala的fold
函数的UDF:
I'm not aware of any built-in Spark functions that can successively identify the start of the next 24-hour session (or any given time period) based on where the previous session ends in a dynamic fashion. One approach to handle such requirement is through a UDF that leverages Scala's fold
function:
def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
if (ts == 0 || tsAcc + ts > tLimit)
("N" :: flags, 0L)
else
("Y" :: flags, tsAcc + ts)
}._1.reverse
logins zip flags
}
UDF接受要处理的time-diff
列表(当前行和上一行之间的秒数).请注意,UDF中foldLeft
的累加器是(flags,tsAcc)的元组,其中:
The UDF takes a list of time-diff
(in seconds between current and previous rows) to be processed. Note that the accumulator for foldLeft
in the UDF is a Tuple of (flags, tsAcc), where:
-
flags
是要返回的重复标志的列表 -
tsAcc
用于将条件累加时间戳记值进行下一次迭代
flags
is the list of duplicate-flags to be returnedtsAcc
is for carrying over the conditionally cumulating timestamp value to the next iteration
还要注意,login-date
的列表只是通过",以便包含在最终数据集中.
Also note that the list of login-date
is only "passed thru" in order to be included in the final dataset.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("user1", "12/1/19 8:00"),
("user1", "12/1/19 10:00"),
("user1", "12/1/19 23:00"),
("user1", "12/2/19 7:00"),
("user1", "12/2/19 8:00"),
("user1", "12/2/19 10:00"),
("user1", "12/3/19 9:00"),
("user1", "12/3/19 23:00"),
("user1", "12/4/19 7:00"),
("user2", "12/4/19 8:00"),
("user2", "12/5/19 5:00"),
("user2", "12/6/19 0:00")
).toDF("user", "login")
使用groupBy/collect_list
,将time-diff
的列表和login-date
列表一起馈送到UDF以生成所需的重复标记,然后使用explode
将其压平:
Using groupBy/collect_list
, the list of time-diff
along with login-date
list are fed to the UDF to generate the wanted duplicate-flags, which then get flattened using explode
:
val win1 = Window.partitionBy("user").orderBy("ts")
df.
withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
show
// +-----+-------------+---------+
// | user| login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00| N|
// |user1|12/1/19 10:00| Y|
// |user1|12/1/19 23:00| Y|
// |user1| 12/2/19 7:00| Y|
// |user1| 12/2/19 8:00| Y|
// |user1|12/2/19 10:00| N|
// |user1| 12/3/19 9:00| Y|
// |user1|12/3/19 23:00| N|
// |user1| 12/4/19 7:00| Y|
// |user2| 12/4/19 8:00| N|
// |user2| 12/5/19 5:00| Y|
// |user2| 12/6/19 0:00| N|
// +-----+-------------+---------+
这篇关于首次登录后24小时内,火花标记重复的用户登录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!