插入缺失的日期行并在新行中插入旧值 PySpark [英] Insert missing date rows and insert old values in the new rows PySpark

查看:54
本文介绍了插入缺失的日期行并在新行中插入旧值 PySpark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含人、重量和时间戳的数据帧:

I have a DataFrame that contains a person, a weight and and timestamp as such:

+-----------+-------------------+------+
|     person|          timestamp|weight|
+-----------+-------------------+------+
|          1|2019-12-02 14:54:17| 49.94|
|          1|2019-12-03 08:58:39| 50.49|
|          1|2019-12-06 10:44:01| 50.24|
|          2|2019-12-02 08:58:39| 62.32|
|          2|2019-12-04 10:44:01| 65.64|
+-----------+-------------------+------+

我想填写每个人每个日期都有的东西,这意味着上面应该是:

I want to fill such that every person has something for every date, meaning that the above should be:

+-----------+-------------------+------+
|     person|          timestamp|weight|
+-----------+-------------------+------+
|          1|2019-12-02 14:54:17| 49.94|
|          1|2019-12-03 08:58:39| 50.49|
|          1|2019-12-04 00:00:01| 50.49|
|          1|2019-12-05 00:00:01| 50.49|
|          1|2019-12-06 10:44:01| 50.24|
|          1|2019-12-07 00:00:01| 50.24|
|          1|2019-12-08 00:00:01| 50.24|
|          2|2019-12-02 08:58:39| 62.32|
|          2|2019-12-03 00:00:01| 62.32|
|          2|2019-12-04 10:44:01| 65.64|
|          2|2019-12-05 00:00:01| 65.64|
|          2|2019-12-06 00:00:01| 65.64|
|          2|2019-12-07 00:00:01| 65.64|
|          2|2019-12-08 00:00:01| 65.64|
+-----------+-------------------+------+

我定义了一个新表,它使用 datediff 来包含最小和最大日期之间的所有日期:

I have defined a new table that use datediff to contain all dates between the min and max date:

min_max_date = df_person_weights.select(min("timestamp"), max("timestamp")) \
        .withColumnRenamed("min(timestamp)", "min_date") \
        .withColumnRenamed("max(timestamp)", "max_date")

min_max_date = min_max_date.withColumn("datediff", datediff("max_date", "min_date")) \
        .withColumn("repeat", expr("split(repeat(',', datediff), ',')")) \
        .select("*", posexplode("repeat").alias("date", "val")) \
        .withColumn("date", expr("date_add(min_date, date)"))

这给了我一个包含以下日期的新数据帧:

This gives me a new DataFrame that contains the dates like:

+----------+
|      date|
+----------+
|2019-12-03|    
|2019-12-03|
|2019-12-04|
|2019-12-05|
|2019-12-06|
|2019-12-07|
|2019-12-08|
+----------+

我尝试了不同的连接,例如:

I have tried different joins like:

min_max_date.join(df_price_history, min_max_date.date != df_price_history.date, "leftouter")

但是我没有得到我需要的结果,有人可以帮忙吗?如何合并我现在拥有的信息?

But I'm not getting the results that I need, can someone help with this? How do I merge the information I have now?

推荐答案

您希望向前填充数据集.这变得有点复杂,因为您需要按类别(人)执行此操作.

You’re looking to forward-fill a dataset. This is being made a bit more complex because you need to do it per category (person).

一种方法是这样的:创建一个新的 DataFrame,其中包含您想要为每个人设置值的所有日期(见下文,这只是 dates_by_person).

One way to do it would be like this: create a new DataFrame that has all the dates you want to have a value for, per person (see below, this is just dates_by_person).

然后,将原始 DataFrame 左连接到这个 DataFrame,这样您就可以开始创建缺失的行了.

Then, left-join the original DataFrame to this one, so you start creating the missing rows.

接下来用加窗函数找出每组person中,按日期排序,最后一个非空的权重.如果您每个日期可以有多个条目(因此一个人在一个特定日期填写了多个记录),您还必须按时间戳列进行排序.

Next, use a windowing function to find in each group of person, sorted by the date, the last non-null weight. In case you can have multiple entries per date (so one person has multiple filled in records on one specific date), you must also order by the timestamp column.

最后合并列,以便将任何空字段替换为预期值.

Finally you coalesce the columns, so that any null-field gets replaced by the intended value.

from datetime import datetime, timedelta
from itertools import product

import pyspark.sql.functions as psf
from pyspark.sql import Window

data = (  # recreate the DataFrame
    (1, datetime(2019, 12, 2, 14, 54, 17), 49.94),
    (1, datetime(2019, 12, 3, 8, 58, 39), 50.49),
    (1, datetime(2019, 12, 6, 10, 44, 1), 50.24),
    (2, datetime(2019, 12, 2, 8, 58, 39), 62.32),
    (2, datetime(2019, 12, 4, 10, 44, 1), 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))

min_max_timestamps = df.agg(psf.min(df.timestamp), psf.max(df.timestamp)).head()
first_date, last_date = [ts.date() for ts in min_max_timestamps]
all_days_in_range = [first_date + timedelta(days=d)
                     for d in range((last_date - first_date).days + 1)]
people = [row.person for row in df.select("person").distinct().collect()]
dates_by_person = spark.createDataFrame(product(people, all_days_in_range),
                                        schema=("person", "date"))

df2 = (dates_by_person.join(df,
                            (psf.to_date(df.timestamp) == dates_by_person.date)
                            & (dates_by_person.person == df.person),
                            how="left")
       .drop(df.person)
       )
wind = (Window
        .partitionBy("person")
        .rangeBetween(Window.unboundedPreceding, -1)
        .orderBy(psf.unix_timestamp("date"))
        )
df3 = df2.withColumn("last_weight",
                     psf.last("weight", ignorenulls=True).over(wind))
df4 = df3.select(
    df3.person,
    psf.coalesce(df3.timestamp, psf.to_timestamp(df3.date)).alias("timestamp"),
    psf.coalesce(df3.weight, df3.last_weight).alias("weight"))
df4.show()
# +------+-------------------+------+
# |person|          timestamp|weight|
# +------+-------------------+------+
# |     1|2019-12-02 14:54:17| 49.94|
# |     1|2019-12-03 08:58:39| 50.49|
# |     1|2019-12-04 00:00:00| 50.49|
# |     1|2019-12-05 00:00:00| 50.49|
# |     1|2019-12-06 10:44:01| 50.24|
# |     2|2019-12-02 08:58:39| 62.32|
# |     2|2019-12-03 00:00:00| 62.32|
# |     2|2019-12-04 10:44:01| 65.64|
# |     2|2019-12-05 00:00:00| 65.64|
# |     2|2019-12-06 00:00:00| 65.64|
# +------+-------------------+------+

正如 David 在评论中所建议的,如果您的人数非常多,dates_by_people 的构建可以通过不需要将所有内容都交给驱动程序的方式来完成.在这个例子中,我们谈论的是少量的整数,没什么大不了的.但如果它变大,请尝试:

as suggested by David in the comments, if you have a very large number of people, the construction of dates_by_people can be done in a way that doesn’t require bringing everything to the driver. In this example, we’re talking about a small number of integers, nothing big. But if it gets big, try:

dates = spark.createDataFrame(((d,) for d in all_days_in_range),
                              schema=("date",))
people = df.select("person").distinct()
dates_by_person = dates.crossJoin(people)

这篇关于插入缺失的日期行并在新行中插入旧值 PySpark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆