Spark 在首次登录后 24 小时内标记重复用户登录 [英] Spark marking duplicate user login within 24 hour after first login
问题描述
我有一个包含用户和登录时间的数据集.如果在首次登录后的 24 小时内有/额外登录,我需要标记重复.活动窗口随着用户登录打开.例如,这里是样本数据集
<前>用户登录-----------------------------用户 1 12/1/19 8:00用户 1 12/1/19 10:00用户 1 12/1/19 23:00用户 1 12/2/19 7:00用户 1 12/2/19 8:00用户 1 12/2/19 10:00用户 1 12/3/19 23:00用户 1 12/4/19 7:00用户 2 12/4/19 8:00用户 2 12/5/19 5:00用户 2 12/6/19 0:00预期结果
<前>用户登录重复---------------------------------user1 12/1/19 8:00 N 这是 user1 的首次登录 - 24 小时窗口在此处打开user1 12/1/19 10:00 Y 因为这是在 24 小时内用户 1 12/1/19 23:00 是用户 1 12/2/19 7:00 是用户 1 12/2/19 8:00 是user1 12/2/19 10:00 N 此活动时间大于(上一窗口打开时间 + 24 小时).上一个窗口关闭,第二个窗口在此处打开用户 1 12/3/19 23:00 N用户 1 12/4/19 7:00 是用户 2 12/4/19 8:00 N用户 2 12/5/19 5:00 是用户 2 12/6/19 0:00 N我查看了 具有复杂条件的 Spark SQL 窗口函数 但是这个如果用户以固定间隔登录,例如每 18 小时登录一次,则解决方案将不起作用.
这是另一个例子(如果解决方案只考虑第一个活动来计算 24 小时窗口,它将为下面的记录 #7 提供错误的结果(非重复)
<前>user1 12/1/19 8:00 N 这是 user1 的首次登录 - 24 小时窗口在此处打开user1 12/1/19 10:00 Y 因为这是在 24 小时内用户 1 12/1/19 23:00 是用户 1 12/2/19 7:00 是用户 1 12/2/19 8:00 是user1 12/2/19 10:00 N 此活动时间大于(上一窗口打开时间 + 24 小时).上一个窗口关闭,第二个窗口在此处打开**user1 12/3/19 09:00 N**用户 1 12/3/19 23:00 N用户 1 12/4/19 7:00 是用户 2 12/4/19 8:00 N用户 2 12/5/19 5:00 是用户 2 12/6/19 0:00 N我不知道有任何内置的 Spark 函数可以连续识别基于下一个 24 小时会话(或任何给定时间段)的开始上一个会话以动态方式结束的位置.处理此类需求的一种方法是通过利用 Scala 的 fold
函数的 UDF:
def dupeFlags(tLimit: Long) = udf{ (登录: Seq[String], tsDiffs: Seq[Long]) =>val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>if (ts == 0 || tsAcc + ts > tLimit)("N" :: 标志,0L)别的("Y" :: 标志,tsAcc + ts)}._1.反向登录 zip 标志}
UDF 获取要处理的 time-diff
列表(当前行和前一行之间的秒数).请注意,UDF 中 foldLeft
的累加器是 (flags, tsAcc) 的元组,其中:
flags
是要返回的重复标志列表tsAcc
用于将有条件累积的时间戳值带入下一次迭代
另请注意,login-date
的列表只是通过"才能包含在最终数据集中.
import org.apache.spark.sql.expressions.Window导入 org.apache.spark.sql.functions._导入 spark.implicits._val df = 序列(("user1", "12/1/19 8:00"),("user1", "12/1/19 10:00"),("user1", "12/1/19 23:00"),("user1", "12/2/19 7:00"),("user1", "12/2/19 8:00"),("user1", "12/2/19 10:00"),("user1", "12/3/19 9:00"),("user1", "12/3/19 23:00"),("user1", "12/4/19 7:00"),("user2", "12/4/19 8:00"),("user2", "12/5/19 5:00"),("user2", "12/6/19 0:00")).toDF("用户", "登录")
使用groupBy/collect_list
,将time-diff
列表和login-date
列表送入UDF 以生成想要的重复标志,然后使用 explode
扁平化:
val win1 = Window.partitionBy("user").orderBy("ts")df.withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).withColumn("tuple",explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).展示//+-----+-------------+---------+//|用户|登录|重复|//+-----+-------------+---------+//|用户1|12/1/19 8:00|N|//|user1|12/1/19 10:00|是|//|user1|12/1/19 23:00|是|//|用户1|12/2/19 7:00|是|//|用户1|12/2/19 8:00|是|//|user1|12/2/19 10:00|N|//|用户1|12/3/19 9:00|是|//|user1|12/3/19 23:00|N|//|用户1|12/4/19 7:00|是|//|用户 2|12/4/19 8:00|N|//|用户 2|12/5/19 5:00|是|//|用户 2|12/6/19 0:00|N|//+-----+-------------+---------+
i have a dataset with users and login time. I need to mark duplicate if there is/additional logins within 24 hour period AFTER First login. Activity window opens with user login. For example, here is sample data set
user login ----------------------------- user1 12/1/19 8:00 user1 12/1/19 10:00 user1 12/1/19 23:00 user1 12/2/19 7:00 user1 12/2/19 8:00 user1 12/2/19 10:00 user1 12/3/19 23:00 user1 12/4/19 7:00 user2 12/4/19 8:00 user2 12/5/19 5:00 user2 12/6/19 0:00
Expected result
user login Duplicate --------------------------------- user1 12/1/19 8:00 N this is first login for user1 - 24 hour window opens here user1 12/1/19 10:00 Y since this is within 24 hours user1 12/1/19 23:00 Y user1 12/2/19 7:00 Y user1 12/2/19 8:00 Y user1 12/2/19 10:00 N This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here user1 12/3/19 23:00 N user1 12/4/19 7:00 Y user2 12/4/19 8:00 N user2 12/5/19 5:00 Y user2 12/6/19 0:00 N
I looked Spark SQL window function with complex condition but this solution won't work if user logins are at fixed interval let's say every 18 hours.
Here is another example (if solution considers only first activity to calculate 24 hr window, it will give incorrect result(Non-duplicate) for record # 7 below)
user1 12/1/19 8:00 N this is first login for user1 - 24 hour window opens here user1 12/1/19 10:00 Y since this is within 24 hours user1 12/1/19 23:00 Y user1 12/2/19 7:00 Y user1 12/2/19 8:00 Y user1 12/2/19 10:00 N This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here **user1 12/3/19 09:00 N** user1 12/3/19 23:00 N user1 12/4/19 7:00 Y user2 12/4/19 8:00 N user2 12/5/19 5:00 Y user2 12/6/19 0:00 N
I'm not aware of any built-in Spark functions that can successively identify the start of the next 24-hour session (or any given time period) based on where the previous session ends in a dynamic fashion. One approach to handle such requirement is through a UDF that leverages Scala's fold
function:
def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
if (ts == 0 || tsAcc + ts > tLimit)
("N" :: flags, 0L)
else
("Y" :: flags, tsAcc + ts)
}._1.reverse
logins zip flags
}
The UDF takes a list of time-diff
(in seconds between current and previous rows) to be processed. Note that the accumulator for foldLeft
in the UDF is a Tuple of (flags, tsAcc), where:
flags
is the list of duplicate-flags to be returnedtsAcc
is for carrying over the conditionally cumulating timestamp value to the next iteration
Also note that the list of login-date
is only "passed thru" in order to be included in the final dataset.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("user1", "12/1/19 8:00"),
("user1", "12/1/19 10:00"),
("user1", "12/1/19 23:00"),
("user1", "12/2/19 7:00"),
("user1", "12/2/19 8:00"),
("user1", "12/2/19 10:00"),
("user1", "12/3/19 9:00"),
("user1", "12/3/19 23:00"),
("user1", "12/4/19 7:00"),
("user2", "12/4/19 8:00"),
("user2", "12/5/19 5:00"),
("user2", "12/6/19 0:00")
).toDF("user", "login")
Using groupBy/collect_list
, the list of time-diff
along with login-date
list are fed to the UDF to generate the wanted duplicate-flags, which then get flattened using explode
:
val win1 = Window.partitionBy("user").orderBy("ts")
df.
withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
show
// +-----+-------------+---------+
// | user| login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00| N|
// |user1|12/1/19 10:00| Y|
// |user1|12/1/19 23:00| Y|
// |user1| 12/2/19 7:00| Y|
// |user1| 12/2/19 8:00| Y|
// |user1|12/2/19 10:00| N|
// |user1| 12/3/19 9:00| Y|
// |user1|12/3/19 23:00| N|
// |user1| 12/4/19 7:00| Y|
// |user2| 12/4/19 8:00| N|
// |user2| 12/5/19 5:00| Y|
// |user2| 12/6/19 0:00| N|
// +-----+-------------+---------+
这篇关于Spark 在首次登录后 24 小时内标记重复用户登录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!