在按窗口分区/分组的时间窗口上聚合 [英] Aggregate over time windows on a partitioned/grouped by window

查看:40
本文介绍了在按窗口分区/分组的时间窗口上聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来的火花和学习.

I am new to spark and learning as I go.

我有这个火花数据框.我想按日期排序并获取按ID1"、ID2"和record_type"分区的最新记录.

I have this spark data frame. I want to order by date and get the latest record partitioned by 'ID1', 'ID2', and 'record_type'.

我的输入就像

data = [
    ("ACC.PXP", "7246", "2018-10-18T16:20:00", "Hospital", None, "IN"),
    ("ACC.PXP", "7246", "2018-10-18T16:20:00", None, "Foundation", "IN"),
    ("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"),
    ("ACC.PXP", "7246", "2018-11-11T00:00:00", None, "Washington", "OUT"),
    ("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital", None, "OUT"),
    ("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
    ("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
    ("ACC.PXP", "7246", "2020-12-04T15:00:00", "Care", "Betel", "OUT"),
    ("ACC.PXP", "7246", "2020-13-04T15:00:00", "Care", None, "OUT"),
]
df = spark.createDataFrame(
    data=data, schema=["ID1", "ID2", "date", "type", "name", "record_type"]
)
df.orderBy(F.col("date")).show(truncate=False)

+-------+----+-------------------+--------+----------+-----------+
|ID1    |ID2 |date               |type    |name      |record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-10-18T16:20:00|null    |Foundation|IN         |
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|null      |IN         |
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|IN         |
|ACC.PXP|7246|2018-11-11T00:00:00|null    |Washington|OUT        |
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|null      |OUT        |
|ACC.PXP|7246|2018-11-15T04:00:00|Home    |null      |IN         |
|ACC.PXP|7246|2018-11-15T04:00:00|Home    |null      |IN         |
|ACC.PXP|7246|2020-12-04T15:00:00|Care    |Betel     |OUT        |
|ACC.PXP|7246|2020-13-04T15:00:00|Care    |null      |OUT        |
+-------+----+-------------------+--------+----------+-----------+

... 而我的预期输出会像

... and my expected output will be like

data2 = [
    ("ACC.PXP", "7246", "2018-11-10T00:00:00", "Hospital", "Foundation", "IN"),
    ("ACC.PXP", "7246", "2018-11-12T00:00:00", "Hospital", "Washington", "OUT"),
    ("ACC.PXP", "7246", "2018-11-15T04:00:00", "Home", None, "IN"),
    ("ACC.PXP", "7246", "2020-13-04T15:00:00", "Care", "Betel", "OUT"),
]
sdf = spark.createDataFrame(
    data=data2, schema=["ID1", "ID2", "date", "type", "name", "record_type"]
)
sdf.orderBy(F.col("date")).show(truncate=False)


+-------+----+-------------------+--------+----------+-----------+
|ID1    |ID2 |date               |type    |name      |record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|IN         |
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington|OUT        |
|ACC.PXP|7246|2018-11-15T04:00:00|Home    |null      |IN         |
|ACC.PXP|7246|2020-13-04T15:00:00|Care    |Betel     |OUT        |
+-------+----+-------------------+--------+----------+-----------+

我试过这个,看起来它适用于这个示例数据集.但是,当我测试实际数据时,逻辑似乎只选择一个IN"和一个OUT"记录.任何输入都受到高度赞赏.

I tried this and it looks like it works for this sample dataset. However, the logic seems to only pick one 'IN' and one 'OUT' records when I tested the actual data. Any input is highly appreciated.

w2 = Window.partitionBy("ID1", "ID2", "type", "date").orderBy(F.desc("date"))
w3 = Window.partitionBy("ID1", "ID2", "type").orderBy(F.asc("date"))
w4 = Window.partitionBy("ID1", "ID2", "type").orderBy(F.desc("date"))

df1 = (
    df.withColumn(
        "type",
        when(col("type").isNotNull(), col("type")).otherwise(
            last("type", True).over(w1)
        ),
    )
    .withColumn(
        "name",
        when(col("name").isNotNull(), col("name")).otherwise(
            last("name", True).over(w1)
        ),
    )
    .withColumn("row_number", F.row_number().over(w2))
    .filter(F.col("row_number") == 1)
    .drop("row_number")
)

df2 = (
    df1.withColumn(
        "type",
        when(col("type").isNotNull(), col("type")).otherwise(
            last("type", True).over(w3)
        ),
    )
    .withColumn(
        "name",
        when(col("name").isNotNull(), col("name")).otherwise(
            F.last("name", True).over(w3)
        ),
    )
    .withColumn("GroupingSeq", F.row_number().over(w4))
    .filter(F.col("GroupingSeq") == 1)
    .drop("GroupingSeq")
)

df2.orderBy(F.asc("date")).show()

推荐答案

首先需要分配一个组id:

First, you need to assign a group id :

from pyspark.sql import functions as F, Window as W

df2 = (
    df.withColumn(
        "id",
        F.when(
            F.lag("record_type").over(W.partitionBy("ID1", "ID2").orderBy("date"))
            == F.col("record_type"),
            0,
        ).otherwise(1),
    )
    .withColumn("id", F.sum("id").over(W.partitionBy("ID1", "ID2").orderBy("date")))
)

df2.show()
+-------+----+-------------------+--------+----------+-----------+---+          
|    ID1| ID2|               date|    type|      name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|      null|         IN|  1|
|ACC.PXP|7246|2018-10-18T16:20:00|    null|Foundation|         IN|  1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|         IN|  1|
|ACC.PXP|7246|2018-11-11T00:00:00|    null|Washington|        OUT|  2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|      null|        OUT|  2|
|ACC.PXP|7246|2018-11-15T04:00:00|    Home|      null|         IN|  3|
|ACC.PXP|7246|2018-11-15T04:00:00|    Home|      null|         IN|  3|
|ACC.PXP|7246|2020-12-04T15:00:00|    Care|     Betel|        OUT|  4|
|ACC.PXP|7246|2020-13-04T15:00:00|    Care|      null|        OUT|  4|
+-------+----+-------------------+--------+----------+-----------+---+

然后,您对有空值的列进行赋值:

Then, you value the columns where there are nulls:

df3 = df2.withColumn(
    "name", 
    F.coalesce(
        F.col("name"),
        F.max("name").over(W.partitionBy("ID1", "ID2", "id"))
    )
).withColumn(
    "type", 
    F.coalesce(
        F.col("type"),
        F.max("type").over(W.partitionBy("ID1", "ID2", "id"))
    )
)

df3.show()
+-------+----+-------------------+--------+----------+-----------+---+
|    ID1| ID2|               date|    type|      name|record_type| id|
+-------+----+-------------------+--------+----------+-----------+---+
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation|         IN|  1|
|ACC.PXP|7246|2018-10-18T16:20:00|Hospital|Foundation|         IN|  1|
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|         IN|  1|
|ACC.PXP|7246|2018-11-11T00:00:00|Hospital|Washington|        OUT|  2|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington|        OUT|  2|
|ACC.PXP|7246|2018-11-15T04:00:00|    Home|      null|         IN|  3|
|ACC.PXP|7246|2018-11-15T04:00:00|    Home|      null|         IN|  3|
|ACC.PXP|7246|2020-12-04T15:00:00|    Care|     Betel|        OUT|  4|
|ACC.PXP|7246|2020-13-04T15:00:00|    Care|     Betel|        OUT|  4|
+-------+----+-------------------+--------+----------+-----------+---+

最后,您选择最后一个"每个元组(ID1",ID2",id")的行:

Finally, you select the "last" line for each tuple("ID1", "ID2", "id"):

df4 = df3.withColumn(
    "row",
    F.row_number().over(W.partitionBy("ID1", "ID2", "id").orderBy(F.col("date").desc()))
).where("row=1").drop("row", "id")

df4.show()
+-------+----+-------------------+--------+----------+-----------+              
|    ID1| ID2|               date|    type|      name|record_type|
+-------+----+-------------------+--------+----------+-----------+
|ACC.PXP|7246|2018-11-10T00:00:00|Hospital|Foundation|         IN|
|ACC.PXP|7246|2018-11-12T00:00:00|Hospital|Washington|        OUT|
|ACC.PXP|7246|2018-11-15T04:00:00|    Home|      null|         IN|
|ACC.PXP|7246|2020-13-04T15:00:00|    Care|     Betel|        OUT|
+-------+----+-------------------+--------+----------+-----------+

这篇关于在按窗口分区/分组的时间窗口上聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆