使用Spark Scala为数据中的每个组选择窗口操作后的最新时间戳记录 [英] Select latest timestamp record after a window operation for every group in the data with Spark Scala

查看:34
本文介绍了使用Spark Scala为数据中的每个组选择窗口操作后的最新时间戳记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在一天的时间窗口 (86400) 内计算了 (user,app) 的尝试次数.我想提取具有最新时间戳和计数的行,并删除不必要的先前计数.确保您的答案考虑了时间窗口.一个拥有 1 台设备的用户可以一天或一周进行多次尝试,我希望能够在每个特定窗口中使用最终计数来检索那些特定时刻.

我的初始数据集是这样的:

val df = sc.parallelize(Seq(("user1", "iphone", "2017-12-22 10:06:18", "成功"),(user1",iphone",2017-12-22 11:15:12",失败"),("user1", "iphone", "2017-12-22 12:06:18", "成功"),(user1",iphone",2017-12-22 09:15:12",失败"),("user1", "iphone", "2017-12-20 10:06:18", "成功"),(user1",iphone",2017-12-20 11:15:12",失败"),("user1", "iphone", "2017-12-20 12:06:18", "成功"),(user1",iphone",2017-12-20 09:15:12",失败"),("user1", "android", "2017-12-20 09:25:20", "成功"),("user1", "android", "2017-12-20 09:44:22", "成功"),("user1", "android", "2017-12-20 09:58:22", "成功"),("user1", "iphone", "2017-12-20 16:44:20", "成功"),("user1", "iphone", "2017-12-20 16:44:25", "成功"),(user1"、iphone"、2017-12-20 16:44:35"、成功"))).toDF("用户名", "设备", "日期时间", "状态")

我运行的代码和我得到的.

//基本上我正在寻找 1 天,即 86400 秒val w1 = Window.partitionBy("用户名", "设备").orderBy(col("date_time").cast("date_time").cast("long").desc).rangeBetween(-86400, 0)val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))

现在我有

//countEveryAttemptDF.show+--------+--------------+-------------+-------+--------+|用户名|.设备|日期时间|状态|尝试|+--------+--------------+-------------+-------+--------+|用户1|安卓|2017-12-20 09:58:22|成功|1||用户1|安卓|2017-12-20 09:44:22|成功|2||用户1|安卓|2017-12-20 09:25:20|成功|3||用户1|手机|2017-12-22 12:06:18|成功|1||用户1|手机|2017-12-22 11:15:12|失败|2||用户1|手机|2017-12-22 10:06:18|成功|3||用户1|手机|2017-12-22 09:15:12|失败|4||用户1|手机|2017-12-20 16:44:35|成功|1||用户1|手机|2017-12-20 16:44:25|成功|2||用户1|手机|2017-12-20 16:44:20|成功|3||用户1|手机|2017-12-20 12:06:18|成功|4||用户1|手机|2017-12-20 11:15:12|失败|5||用户1|手机|2017-12-20 10:06:18|成功|6||用户1|手机|2017-12-20 09:15:12|失败|7|+--------+--------------+-------------+-------+--------+

我想要的.所以我需要最新的时间戳及其计数,确保我在同一时间窗口中.

+--------+--------------+-------------+-------+--------+|用户名|.设备|日期时间|状态|尝试|+--------+--------------+--------------+-------+--------+|用户 1 |安卓|2017-12-20 09:25:20|成功|3||用户 1 |手机|2017-12-22 09:15:12|失败|4||用户 1 |手机|2017-12-20 09:15:12|失败|7|+--------+--------------+-------------+-------+--------+**

解决方案

您就快到了.您已通过查看一天范围计算出计数.现在你所要做的就是找出一天范围内的最新记录,这可以通过在同一个窗口函数上使用 last 来完成,但范围相反.

import org.apache.spark.sql.expressions._导入 org.apache.spark.sql.functions._def day(x: Int) = x * 86400val w1 = Window.partitionBy("用户名", "设备").orderBy(col("date_time").cast("timestamp").cast("long").desc).rangeBetween(-day(1), 0)val w2 = Window.partitionBy("用户名", "设备").orderBy(col("date_time").cast("timestamp").cast("long").desc).rangeBetween(0, day(1))val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1)).withColumn("att", last("attempts").over(w2)).filter(col("attempts") === col("att")).drop("att")

应该给你

+--------+--------------+-------------+-------+--------+|用户名|设备|日期时间|状态|尝试|+--------+--------------+-------------+-------+--------+|user1 |android |2017-12-20 09:25:20 |成功|3 ||user1 |iphone |2017-12-22 09:15:12 |失败|4 ||user1 |iphone |2017-12-20 09:15:12 |失败|7 |+--------+--------------+-------------+-------+--------+

类似于下面的评论中所述

<块引用><块引用>

1 天有 86400 秒.我想回顾 1 天.同样,3600 秒是 1 小时.1 周内 604,800 秒

您可以将天函数更改为小时和周,如下所示,并在 window rangeBetween

中使用它们

def hour(x: Int) = x * 3600def周(x:整数)= x * 604800

希望回答对你有帮助

I ran a count of attempts by (user,app) over a time window of day(86400). I want to extract the rows with latest timestamp with the count and remove unnecessary previous counts. Make sure your answer considers the time window. One user with 1 device can do make multiple attempts a day or a week, I wanna be able to retrieve those particular moments with the final count in every specific window.

My intial dataset is like this:

val df = sc.parallelize(Seq(
  ("user1", "iphone", "2017-12-22 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-22 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 09:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 09:15:12",  "failed"),
  ("user1", "android", "2017-12-20 09:25:20", "Success"),
  ("user1", "android", "2017-12-20 09:44:22", "Success"),
  ("user1", "android", "2017-12-20 09:58:22", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:20", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:25", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")

The code I ran and what I got.

// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
               .orderBy(col("date_time").cast("date_time").cast("long").desc)
               .rangeBetween(-86400, 0) 


val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))

Now I have

// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|   user1|       android|  2017-12-20 09:58:22|Success|       1|
|   user1|       android|  2017-12-20 09:44:22|Success|       2|
|   user1|       android|  2017-12-20 09:25:20|Success|       3|
|   user1|        iphone|  2017-12-22 12:06:18|Success|       1|
|   user1|        iphone|  2017-12-22 11:15:12| failed|       2|
|   user1|        iphone|  2017-12-22 10:06:18|Success|       3|
|   user1|        iphone|  2017-12-22 09:15:12| failed|       4|
|   user1|        iphone|  2017-12-20 16:44:35|Success|       1|
|   user1|        iphone|  2017-12-20 16:44:25|Success|       2|
|   user1|        iphone|  2017-12-20 16:44:20|Success|       3|
|   user1|        iphone|  2017-12-20 12:06:18|Success|       4|
|   user1|        iphone|  2017-12-20 11:15:12| failed|       5|
|   user1|        iphone|  2017-12-20 10:06:18|Success|       6|
|   user1|        iphone|  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+

What I want. So I want the latest timestamp along with its count by making sure I'm in the same time window.

+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|  user1     |       android    |  2017-12-20 09:25:20|Success|       3|
|  user1     |        iphone    |  2017-12-22 09:15:12| failed|       4|
|  user1     |        iphone    |  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+**

解决方案

You are almost there. You have figured out the counts by looking at one day range. Now all you have to do is figure out the latest record in that one day range which can be done by using last on the same window function but with the range reversed.

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

def day(x: Int) = x * 86400

val w1 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(0, day(1))

val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
                            .withColumn("att", last("attempts").over(w2))
                            .filter(col("attempts") === col("att"))
                            .drop("att")

which should give you

+--------+--------------+---------------------+-------+--------+
|username|        device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1   |android       |2017-12-20 09:25:20  |Success|3       |
|user1   |iphone        |2017-12-22 09:15:12  | Failed|4       |
|user1   |iphone        |2017-12-20 09:15:12  | Failed|7       |
+--------+--------------+---------------------+-------+--------+

similarly as stated in the comments below

There are 86400 seconds in 1 day. I wanted to look back 1 day. Similarly 3600 seconds is 1 hour. And 604,800 seconds in 1 week

you can change the day function to hours and weeks as below and use them in window rangeBetween

def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800

I hope the answer is helpful

这篇关于使用Spark Scala为数据中的每个组选择窗口操作后的最新时间戳记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆