PySpark-在没有显式会话密钥的情况下合并会话数据/遍历所有行 [英] PySpark - Combining Session Data without Explicit Session Key / Iterating over All Rows

查看:103
本文介绍了PySpark-在没有显式会话密钥的情况下合并会话数据/遍历所有行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在没有PySpark中真正的会话密钥"的情况下聚合会话数据.我有在特定时间在某个区域内检测到某人的数据,我想将其汇总为特定访问期间在每个区域所花费的时间(见下文).

I am trying to aggregate session data without a true session "key" in PySpark. I have data where an individual is detected in an area at a specific time, and I want to aggregate that into a duration spent in each area during a specific visit (see below).

这里最棘手的部分是我想推断有人离开每个区域的时间,作为他们在下一个区域中被检测到的时间.这意味着我将需要使用下一个区域ID的开始时间作为任何给定区域ID的结束时间.同一个人的区域ID也可以显示多次.

The tricky part here is that I want to infer the time someone exits each area as the time they are detected in the next area. This means that I will need to use the start time of the next area ID as the end time for any given area ID. Area IDs can also show up more than once for the same individual.

我在MapReduce中实现了此实现,在其中我遍历所有行并汇总时间,直到检测到新的AreaID或Individual,然后输出记录.有没有办法在Spark中做类似的事情?有没有更好的方法来解决这个问题?

I had an implementation of this in MapReduce where I iterate over all rows and aggregate the time until a new AreaID or Individual is detected, then output the record. Is there a way to do something similar in Spark? Is there a better way to approach the problem?

值得注意的是,除非在另一个区域(例如下方的IndividualY,AreaT)中检测到该个人,否则我不想输出记录

Also of note, I do not want to output a record unless the individual has been detected in another area (e.g. IndividualY, AreaT below)

我有以下格式的数据集:

I have a dataset in the following format:


Individual  AreaID  Datetime of Detection
IndividualX AreaQ   1/7/2015 0:00
IndividualX AreaQ   1/7/2015 1:00
IndividualX AreaW   1/7/2015 3:00
IndividualX AreaQ   1/7/2015 4:00
IndividualY AreaZ   2/7/2015 4:00
IndividualY AreaZ   2/7/2015 5:00
IndividualY AreaW   2/7/2015 6:00
IndividualY AreaT   2/7/2015 7:00

我想要以下输出:


Individual  AreaID  Start_Time      End_Time        Duration (minutes)
IndividualX AreaQ   1/7/2015 0:00   1/7/2015 3:00   180
IndividualX AreaW   1/7/2015 3:00   1/7/2015 4:00   60
IndividualY AreaZ   2/7/2015 4:00   2/7/2015 6:00   120
IndividualY AreaW   2/7/2015 6:00   2/7/2015 7:00   60

推荐答案

这是非常漂亮的解决方案,但您可以使用DataFrame和窗口函数.假设您的输入看起来像这样:

It is particularly pretty solution but you can use DataFrames and window functions. Assuming your input looks like this:

rdd = sc.parallelize([
    ("IndividualX", "AreaQ",  "1/7/2015 0:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 1:00"),
    ("IndividualX", "AreaW",  "1/7/2015 3:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 5:00"),
    ("IndividualY", "AreaW",  "2/7/2015 6:00"),
    ("IndividualY", "AreaT",  "2/7/2015 7:00")
])

首先,我们必须将其转换为DataFrame:

First we have to convert it to a DataFrame:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

row = Row("individual", "area_id", "datetime")
fmt = "%d/%m/%Y %H:%M"
df = rdd.map(lambda r: row(r[0], r[1], datetime.strptime(r[2], fmt))).toDF()

接下来让我们定义一个窗口:

Next lets define a window:

from pyspark.sql import functions as f
from pyspark.sql.window import Window

w = Window().partitionBy("individual").orderBy("datetime")

和临时列:

p_area_id = f.lag("area_id").over(w) # Previous area

ind =  f.sum((
    p_area_id.isNull() | # No previous observation
    (p_area_id != f.col("area_id")) # Area changed
).cast("integer")).over(w)

使用上面定义的指标,我们可以选择该区域的最小访问时间戳:

Using indicator defined above we can choose minimum timestamp for visit in the area:

tmp = (df
   .withColumn("ind", ind)
   .groupBy("individual", "area_id", "ind")
   .agg(f.min("datetime").alias("datetime"))
   .drop("ind"))

最后,我们可以定义目标列:

Finally we can define target columns:

end_time = f.lead(f.col("datetime")).over(w)

duration = (
    f.col("end_time").cast("integer") - f.col("datetime").cast("integer")) / 60

并构建输出DataFrame:

and build output DataFrame:

result = (tmp
    .withColumn("end_time", end_time)
    .where(f.col("end_time").isNotNull())
    .withColumn("duration", duration)
    .withColumnRenamed("datetime", "start_time"))

并输出:

+-----------+-------+--------------------+--------------------+--------+
| individual|area_id|          start_time|            end_time|duration|
+-----------+-------+--------------------+--------------------+--------+
|IndividualX|  AreaQ|2015-07-01 00:00:...|2015-07-01 03:00:...|   180.0|
|IndividualX|  AreaW|2015-07-01 03:00:...|2015-07-01 04:00:...|    60.0|
|IndividualY|  AreaZ|2015-07-02 04:00:...|2015-07-02 06:00:...|   120.0|
|IndividualY|  AreaW|2015-07-02 06:00:...|2015-07-02 07:00:...|    60.0|
+-----------+-------+--------------------+--------------------+--------+

如果您更喜欢普通的RDD,则可以将其重塑为以下形式:

If you prefer plain RDDs you can reshape to something like this:

(individual, (area_id, datetime))

和下一个groupByKey并在本地执行所需的操作.

and next groupByKey and perform required operations locally.

这篇关于PySpark-在没有显式会话密钥的情况下合并会话数据/遍历所有行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆