Spark 在首次登录后 24 小时内标记重复用户登录 [英] Spark marking duplicate user login within 24 hour after first login

查看:19
本文介绍了Spark 在首次登录后 24 小时内标记重复用户登录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含用户和登录时间的数据集.如果在首次登录后的 24 小时内有/额外登录,我需要标记重复.活动窗口随着用户登录打开.例如,这里是样本数据集

<前>用户登录-----------------------------用户 1 12/1/19 8:00用户 1 12/1/19 10:00用户 1 12/1/19 23:00用户 1 12/2/19 7:00用户 1 12/2/19 8:00用户 1 12/2/19 10:00用户 1 12/3/19 23:00用户 1 12/4/19 7:00用户 2 12/4/19 8:00用户 2 12/5/19 5:00用户 2 12/6/19 0:00

预期结果

<前>用户登录重复---------------------------------user1 12/1/19 8:00 N 这是 user1 的首次登录 - 24 小时窗口在此处打开user1 12/1/19 10:00 Y 因为这是在 24 小时内用户 1 12/1/19 23:00 是用户 1 12/2/19 7:00 是用户 1 12/2/19 8:00 是user1 12/2/19 10:00 N 此活动时间大于(上一窗口打开时间 + 24 小时).上一个窗口关闭,第二个窗口在此处打开用户 1 12/3/19 23:00 N用户 1 12/4/19 7:00 是用户 2 12/4/19 8:00 N用户 2 12/5/19 5:00 是用户 2 12/6/19 0:00 N

我查看了 具有复杂条件的 Spark SQL 窗口函数 但是这个如果用户以固定间隔登录,例如每 18 小时登录一次,则解决方案将不起作用.

这是另一个例子(如果解决方案只考虑第一个活动来计算 24 小时窗口,它将为下面的记录 #7 提供错误的结果(非重复)

<前>user1 12/1/19 8:00 N 这是 user1 的首次登录 - 24 小时窗口在此处打开user1 12/1/19 10:00 Y 因为这是在 24 小时内用户 1 12/1/19 23:00 是用户 1 12/2/19 7:00 是用户 1 12/2/19 8:00 是user1 12/2/19 10:00 N 此活动时间大于(上一窗口打开时间 + 24 小时).上一个窗口关闭,第二个窗口在此处打开**user1 12/3/19 09:00 N**用户 1 12/3/19 23:00 N用户 1 12/4/19 7:00 是用户 2 12/4/19 8:00 N用户 2 12/5/19 5:00 是用户 2 12/6/19 0:00 N

解决方案

我不知道有任何内置的 Spark 函数可以连续识别基于下一个 24 小时会话(或任何给定时间段)的开始上一个会话以动态方式结束的位置.处理此类需求的一种方法是通过利用 Scala 的 fold 函数的 UDF:

def dupeFlags(tLimit: Long) = udf{ (登录: Seq[String], tsDiffs: Seq[Long]) =>val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>if (ts == 0 || tsAcc + ts > tLimit)("N" :: 标志,0L)别的("Y" :: 标志,tsAcc + ts)}._1.反向登录 zip 标志}

UDF 获取要处理的 time-diff 列表(当前行和前一行之间的秒数).请注意,UDF 中 foldLeft 的累加器是 (flags, tsAcc) 的元组,其中:

  • flags 是要返回的重复标志列表
  • tsAcc 用于将有条件累积的时间戳值带入下一次迭代

另请注意,login-date 的列表只是通过"才能包含在最终数据集中.

import org.apache.spark.sql.expressions.Window导入 org.apache.spark.sql.functions._导入 spark.implicits._val df = 序列(("user1", "12/1/19 8:00"),("user1", "12/1/19 10:00"),("user1", "12/1/19 23:00"),("user1", "12/2/19 7:00"),("user1", "12/2/19 8:00"),("user1", "12/2/19 10:00"),("user1", "12/3/19 9:00"),("user1", "12/3/19 23:00"),("user1", "12/4/19 7:00"),("user2", "12/4/19 8:00"),("user2", "12/5/19 5:00"),("user2", "12/6/19 0:00")).toDF("用户", "登录")

使用groupBy/collect_list,将time-diff 列表和login-date 列表送入UDF 以生成想要的重复标志,然后使用 explode 扁平化:

val win1 = Window.partitionBy("user").orderBy("ts")df.withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).withColumn("tuple",explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).展示//+-----+-------------+---------+//|用户|登录|重复|//+-----+-------------+---------+//|用户1|12/1/19 8:00|N|//|user1|12/1/19 10:00|是|//|user1|12/1/19 23:00|是|//|用户1|12/2/19 7:00|是|//|用户1|12/2/19 8:00|是|//|user1|12/2/19 10:00|N|//|用户1|12/3/19 9:00|是|//|user1|12/3/19 23:00|N|//|用户1|12/4/19 7:00|是|//|用户 2|12/4/19 8:00|N|//|用户 2|12/5/19 5:00|是|//|用户 2|12/6/19 0:00|N|//+-----+-------------+---------+

i have a dataset with users and login time. I need to mark duplicate if there is/additional logins within 24 hour period AFTER First login. Activity window opens with user login. For example, here is sample data set

user    login
-----------------------------
user1   12/1/19 8:00
user1   12/1/19 10:00
user1   12/1/19 23:00
user1   12/2/19 7:00
user1   12/2/19 8:00
user1   12/2/19 10:00
user1   12/3/19 23:00
user1   12/4/19 7:00
user2   12/4/19 8:00
user2   12/5/19 5:00
user2   12/6/19 0:00

Expected result

user    login           Duplicate
---------------------------------
user1   12/1/19 8:00    N    this is first login for user1 - 24 hour window opens here
user1   12/1/19 10:00   Y    since this is within 24 hours 
user1   12/1/19 23:00   Y   
user1   12/2/19 7:00    Y
user1   12/2/19 8:00    Y
user1   12/2/19 10:00   N   This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
user1   12/3/19 23:00   N
user1   12/4/19 7:00    Y
user2   12/4/19 8:00    N
user2   12/5/19 5:00    Y
user2   12/6/19 0:00    N

I looked Spark SQL window function with complex condition but this solution won't work if user logins are at fixed interval let's say every 18 hours.

Here is another example (if solution considers only first activity to calculate 24 hr window, it will give incorrect result(Non-duplicate) for record # 7 below)

user1   12/1/19 8:00    N    this is first login for user1 - 24 hour window opens here
user1   12/1/19 10:00   Y    since this is within 24 hours 
user1   12/1/19 23:00   Y   
user1   12/2/19 7:00    Y
user1   12/2/19 8:00    Y
user1   12/2/19 10:00   N  This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
**user1   12/3/19 09:00 N**
user1   12/3/19 23:00   N
user1   12/4/19 7:00    Y
user2   12/4/19 8:00    N
user2   12/5/19 5:00    Y
user2   12/6/19 0:00    N

解决方案

I'm not aware of any built-in Spark functions that can successively identify the start of the next 24-hour session (or any given time period) based on where the previous session ends in a dynamic fashion. One approach to handle such requirement is through a UDF that leverages Scala's fold function:

def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
  val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
    if (ts == 0 || tsAcc + ts > tLimit)
      ("N" :: flags, 0L)
    else
      ("Y" :: flags, tsAcc + ts)
  }._1.reverse
  logins zip flags
}

The UDF takes a list of time-diff (in seconds between current and previous rows) to be processed. Note that the accumulator for foldLeft in the UDF is a Tuple of (flags, tsAcc), where:

  • flags is the list of duplicate-flags to be returned
  • tsAcc is for carrying over the conditionally cumulating timestamp value to the next iteration

Also note that the list of login-date is only "passed thru" in order to be included in the final dataset.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("user1", "12/1/19 8:00"),
  ("user1", "12/1/19 10:00"),
  ("user1", "12/1/19 23:00"),
  ("user1", "12/2/19 7:00"),
  ("user1", "12/2/19 8:00"),
  ("user1", "12/2/19 10:00"),
  ("user1", "12/3/19 9:00"),
  ("user1", "12/3/19 23:00"),
  ("user1", "12/4/19 7:00"),
  ("user2", "12/4/19 8:00"),
  ("user2", "12/5/19 5:00"),
  ("user2", "12/6/19 0:00")
).toDF("user", "login")

Using groupBy/collect_list, the list of time-diff along with login-date list are fed to the UDF to generate the wanted duplicate-flags, which then get flattened using explode:

val win1 = Window.partitionBy("user").orderBy("ts")

df.
  withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
  withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
  groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
  withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
  select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
  show
// +-----+-------------+---------+
// | user|        login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00|        N|
// |user1|12/1/19 10:00|        Y|
// |user1|12/1/19 23:00|        Y|
// |user1| 12/2/19 7:00|        Y|
// |user1| 12/2/19 8:00|        Y|
// |user1|12/2/19 10:00|        N|
// |user1| 12/3/19 9:00|        Y|
// |user1|12/3/19 23:00|        N|
// |user1| 12/4/19 7:00|        Y|
// |user2| 12/4/19 8:00|        N|
// |user2| 12/5/19 5:00|        Y|
// |user2| 12/6/19 0:00|        N|
// +-----+-------------+---------+

这篇关于Spark 在首次登录后 24 小时内标记重复用户登录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆