PySpark - 在没有显式会话键的情况下组合会话数据/迭代所有行 [英] PySpark - Combining Session Data without Explicit Session Key / Iterating over All Rows

查看:27
本文介绍了PySpark - 在没有显式会话键的情况下组合会话数据/迭代所有行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在 PySpark 中聚合没有真正会话密钥"的会话数据.我有在特定时间在某个区域检测到某个人的数据,我想将其汇总到在特定访问期间在每个区域花费的持续时间(见下文).

这里的棘手部分是我想推断某人离开每个区域的时间作为他们在下一个区域中被检测到的时间.这意味着我需要使用下一个区域 ID 的开始时间作为任何给定区域 ID 的结束时间.同一个人的区域 ID 也可以出现多次.

我在 MapReduce 中有一个实现,我在其中迭代所有行并聚合时间,直到检测到新的区域 ID 或个人,然后输出记录.有没有办法在 Spark 中做类似的事情?有没有更好的方法来解决这个问题?

另外请注意,除非在另一个区域(例如以下的 IndividualY、AreaT)中检测到该个人,否则我不想输出记录

我有以下格式的数据集:

<前>单个区域 ID 检测日期时间个人X区Q 1/7/2015 0:00个人X区Q 1/7/2015 1:00个人X区W 1/7/2015 3:00个人X区Q 1/7/2015 4:00个人Y区Z 2/7/2015 4:00个人Y区Z 2/7/2015 5:00个人Y区W 2/7/2015 6:00个人Y区T 2/7/2015 7:00

我想要所需的输出:

<前>单个区域 ID Start_Time End_Time 持续时间(分钟)个人 X 区域 Q 1/7/2015 0:00 1/7/2015 3:00 180个人X 区域W 1/7/2015 3:00 1/7/2015 4:00 60个人Y区Z 2/7/2015 4:00 2/7/2015 6:00 120个人Y 地区W 2/7/2015 6:00 2/7/2015 7:00 60

解决方案

这是一个特别漂亮的解决方案,但您可以使用 DataFrames 和窗口函数.假设您的输入如下所示:

rdd = sc.parallelize([("IndividualX", "AreaQ", "1/7/2015 0:00"),("IndividualX", "AreaQ", "1/7/2015 1:00"),("IndividualX", "AreaW", "1/7/2015 3:00"),("IndividualX", "AreaQ", "1/7/2015 4:00"),("IndividualY", "AreaZ", "2/7/2015 4:00"),("IndividualY", "AreaZ", "2/7/2015 5:00"),("IndividualY", "AreaW", "2/7/2015 6:00"),("IndividualY", "AreaT", "2/7/2015 7:00")])

首先,我们必须将其转换为 DataFrame:

from datetime import datetime从 pyspark.sql 导入行从 pyspark.sql 导入 HiveContextsqlContext = HiveContext(sc)row = Row("个人", "area_id", "datetime")fmt = "%d/%m/%Y %H:%M"df = rdd.map(lambda r: row(r[0], r[1], datetime.strptime(r[2], fmt))).toDF()

接下来让我们定义一个窗口:

from pyspark.sql 导入函数为 f从 pyspark.sql.window 导入窗口w = Window().partitionBy("individual").orderBy("datetime")

和临时列:

p_area_id = f.lag("area_id").over(w) # 上一个区域ind = f.sum((p_area_id.isNull() |# 没有以前的观察(p_area_id != f.col("area_id")) # 区域改变).cast("整数")).over(w)

使用上面定义的指标,我们可以选择区域内访问的最小时间戳:

tmp = (df.withColumn("ind", ind).groupBy("个人", "area_id", "ind").agg(f.min("datetime").alias("datetime")).drop("ind"))

最后我们可以定义目标列:

end_time = f.lead(f.col("datetime")).over(w)持续时间 = (f.col("end_time").cast("integer") - f.col("datetime").cast("integer"))/60

并构建输出数据帧:

result = (tmp.withColumn("end_time", end_time).where(f.col("end_time").isNotNull()).withColumn("持续时间", 持续时间).withColumnRenamed("datetime", "start_time"))

和输出:

+-----------+-------+------------+--------------------+--------+|个人|area_id|开始时间|结束时间|持续时间|+-----------+-------+------------+--------------------+--------+|个人X|AreaQ|2015-07-01 00:00:...|2015-07-01 03:00:...|180.0||个人X|AreaW|2015-07-01 03:00:...|2015-07-01 04:00:...|60.0||个人Y|区域Z|2015-07-02 04:00:...|2015-07-02 06:00:...|120.0||个人Y|区域W|2015-07-02 06:00:...|2015-07-02 07:00:...|60.0|+-----------+-------+------------+--------------------+--------+

如果你更喜欢普通的 RDD,你可以重塑成这样:

(个人, (area_id, datetime))

和下一个 groupByKey 并在本地执行所需的操作.

I am trying to aggregate session data without a true session "key" in PySpark. I have data where an individual is detected in an area at a specific time, and I want to aggregate that into a duration spent in each area during a specific visit (see below).

The tricky part here is that I want to infer the time someone exits each area as the time they are detected in the next area. This means that I will need to use the start time of the next area ID as the end time for any given area ID. Area IDs can also show up more than once for the same individual.

I had an implementation of this in MapReduce where I iterate over all rows and aggregate the time until a new AreaID or Individual is detected, then output the record. Is there a way to do something similar in Spark? Is there a better way to approach the problem?

Also of note, I do not want to output a record unless the individual has been detected in another area (e.g. IndividualY, AreaT below)

I have a dataset in the following format:

Individual  AreaID  Datetime of Detection
IndividualX AreaQ   1/7/2015 0:00
IndividualX AreaQ   1/7/2015 1:00
IndividualX AreaW   1/7/2015 3:00
IndividualX AreaQ   1/7/2015 4:00
IndividualY AreaZ   2/7/2015 4:00
IndividualY AreaZ   2/7/2015 5:00
IndividualY AreaW   2/7/2015 6:00
IndividualY AreaT   2/7/2015 7:00

I would like the desired output of:

Individual  AreaID  Start_Time      End_Time        Duration (minutes)
IndividualX AreaQ   1/7/2015 0:00   1/7/2015 3:00   180
IndividualX AreaW   1/7/2015 3:00   1/7/2015 4:00   60
IndividualY AreaZ   2/7/2015 4:00   2/7/2015 6:00   120
IndividualY AreaW   2/7/2015 6:00   2/7/2015 7:00   60

解决方案

It is particularly pretty solution but you can use DataFrames and window functions. Assuming your input looks like this:

rdd = sc.parallelize([
    ("IndividualX", "AreaQ",  "1/7/2015 0:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 1:00"),
    ("IndividualX", "AreaW",  "1/7/2015 3:00"),
    ("IndividualX", "AreaQ",  "1/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 4:00"),
    ("IndividualY", "AreaZ",  "2/7/2015 5:00"),
    ("IndividualY", "AreaW",  "2/7/2015 6:00"),
    ("IndividualY", "AreaT",  "2/7/2015 7:00")
])

First we have to convert it to a DataFrame:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

row = Row("individual", "area_id", "datetime")
fmt = "%d/%m/%Y %H:%M"
df = rdd.map(lambda r: row(r[0], r[1], datetime.strptime(r[2], fmt))).toDF()

Next lets define a window:

from pyspark.sql import functions as f
from pyspark.sql.window import Window

w = Window().partitionBy("individual").orderBy("datetime")

And temporary columns:

p_area_id = f.lag("area_id").over(w) # Previous area

ind =  f.sum((
    p_area_id.isNull() | # No previous observation
    (p_area_id != f.col("area_id")) # Area changed
).cast("integer")).over(w)

Using indicator defined above we can choose minimum timestamp for visit in the area:

tmp = (df
   .withColumn("ind", ind)
   .groupBy("individual", "area_id", "ind")
   .agg(f.min("datetime").alias("datetime"))
   .drop("ind"))

Finally we can define target columns:

end_time = f.lead(f.col("datetime")).over(w)

duration = (
    f.col("end_time").cast("integer") - f.col("datetime").cast("integer")) / 60

and build output DataFrame:

result = (tmp
    .withColumn("end_time", end_time)
    .where(f.col("end_time").isNotNull())
    .withColumn("duration", duration)
    .withColumnRenamed("datetime", "start_time"))

And output:

+-----------+-------+--------------------+--------------------+--------+
| individual|area_id|          start_time|            end_time|duration|
+-----------+-------+--------------------+--------------------+--------+
|IndividualX|  AreaQ|2015-07-01 00:00:...|2015-07-01 03:00:...|   180.0|
|IndividualX|  AreaW|2015-07-01 03:00:...|2015-07-01 04:00:...|    60.0|
|IndividualY|  AreaZ|2015-07-02 04:00:...|2015-07-02 06:00:...|   120.0|
|IndividualY|  AreaW|2015-07-02 06:00:...|2015-07-02 07:00:...|    60.0|
+-----------+-------+--------------------+--------------------+--------+

If you prefer plain RDDs you can reshape to something like this:

(individual, (area_id, datetime))

and next groupByKey and perform required operations locally.

这篇关于PySpark - 在没有显式会话键的情况下组合会话数据/迭代所有行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆