首次登录后24小时内,火花标记重复的用户登录 [英] Spark marking duplicate user login within 24 hour after first login

查看:70
本文介绍了首次登录后24小时内,火花标记重复的用户登录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含用户和登录时间的数据集.如果首次登录后24小时内有/其他登录,我需要标记为重复.活动窗口随用户登录一起打开.例如,这是示例数据集

i have a dataset with users and login time. I need to mark duplicate if there is/additional logins within 24 hour period AFTER First login. Activity window opens with user login. For example, here is sample data set


user    login
-----------------------------
user1   12/1/19 8:00
user1   12/1/19 10:00
user1   12/1/19 23:00
user1   12/2/19 7:00
user1   12/2/19 8:00
user1   12/2/19 10:00
user1   12/3/19 23:00
user1   12/4/19 7:00
user2   12/4/19 8:00
user2   12/5/19 5:00
user2   12/6/19 0:00

预期结果


user    login           Duplicate
---------------------------------
user1   12/1/19 8:00    N    this is first login for user1 - 24 hour window opens here
user1   12/1/19 10:00   Y    since this is within 24 hours 
user1   12/1/19 23:00   Y   
user1   12/2/19 7:00    Y
user1   12/2/19 8:00    Y
user1   12/2/19 10:00   N   This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
user1   12/3/19 23:00   N
user1   12/4/19 7:00    Y
user2   12/4/19 8:00    N
user2   12/5/19 5:00    Y
user2   12/6/19 0:00    N

我查看了具有复杂条件的Spark SQL窗口函数,但这如果用户每隔固定的时间登录一次,则该解决方案将无法正常工作.

I looked Spark SQL window function with complex condition but this solution won't work if user logins are at fixed interval let's say every 18 hours.

这是另一个示例(如果解决方案仅考虑第一个活动来计算24小时窗口,则它将为下面的记录7提供不正确的结果(非重复))

Here is another example (if solution considers only first activity to calculate 24 hr window, it will give incorrect result(Non-duplicate) for record # 7 below)


user1   12/1/19 8:00    N    this is first login for user1 - 24 hour window opens here
user1   12/1/19 10:00   Y    since this is within 24 hours 
user1   12/1/19 23:00   Y   
user1   12/2/19 7:00    Y
user1   12/2/19 8:00    Y
user1   12/2/19 10:00   N  This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
**user1   12/3/19 09:00 N**
user1   12/3/19 23:00   N
user1   12/4/19 7:00    Y
user2   12/4/19 8:00    N
user2   12/5/19 5:00    Y
user2   12/6/19 0:00    N

推荐答案

我不知道任何内置的Spark函数可以基于以下命令连续识别下一个24小时会话(或任何给定时间段)的开始上一个会话以动态方式结束的位置.处理此类要求的一种方法是通过利用Scala的fold函数的UDF:

I'm not aware of any built-in Spark functions that can successively identify the start of the next 24-hour session (or any given time period) based on where the previous session ends in a dynamic fashion. One approach to handle such requirement is through a UDF that leverages Scala's fold function:

def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
  val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
    if (ts == 0 || tsAcc + ts > tLimit)
      ("N" :: flags, 0L)
    else
      ("Y" :: flags, tsAcc + ts)
  }._1.reverse
  logins zip flags
}

UDF接受要处理的time-diff列表(当前行和上一行之间的秒数).请注意,UDF中foldLeft的累加器是(flags,tsAcc)的元组,其中:

The UDF takes a list of time-diff (in seconds between current and previous rows) to be processed. Note that the accumulator for foldLeft in the UDF is a Tuple of (flags, tsAcc), where:

  • flags是要返回的重复标志的列表
  • tsAcc用于将条件累加时间戳记值进行下一次迭代
  • flags is the list of duplicate-flags to be returned
  • tsAcc is for carrying over the conditionally cumulating timestamp value to the next iteration

还要注意,login-date的列表只是通过",以便包含在最终数据集中.

Also note that the list of login-date is only "passed thru" in order to be included in the final dataset.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("user1", "12/1/19 8:00"),
  ("user1", "12/1/19 10:00"),
  ("user1", "12/1/19 23:00"),
  ("user1", "12/2/19 7:00"),
  ("user1", "12/2/19 8:00"),
  ("user1", "12/2/19 10:00"),
  ("user1", "12/3/19 9:00"),
  ("user1", "12/3/19 23:00"),
  ("user1", "12/4/19 7:00"),
  ("user2", "12/4/19 8:00"),
  ("user2", "12/5/19 5:00"),
  ("user2", "12/6/19 0:00")
).toDF("user", "login")

使用groupBy/collect_list,将time-diff的列表和login-date列表一起馈送到UDF以生成所需的重复标记,然后使用explode将其压平:

Using groupBy/collect_list, the list of time-diff along with login-date list are fed to the UDF to generate the wanted duplicate-flags, which then get flattened using explode:

val win1 = Window.partitionBy("user").orderBy("ts")

df.
  withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
  withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
  groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
  withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
  select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
  show
// +-----+-------------+---------+
// | user|        login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00|        N|
// |user1|12/1/19 10:00|        Y|
// |user1|12/1/19 23:00|        Y|
// |user1| 12/2/19 7:00|        Y|
// |user1| 12/2/19 8:00|        Y|
// |user1|12/2/19 10:00|        N|
// |user1| 12/3/19 9:00|        Y|
// |user1|12/3/19 23:00|        N|
// |user1| 12/4/19 7:00|        Y|
// |user2| 12/4/19 8:00|        N|
// |user2| 12/5/19 5:00|        Y|
// |user2| 12/6/19 0:00|        N|
// +-----+-------------+---------+

这篇关于首次登录后24小时内,火花标记重复的用户登录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆