Pyspark 列转换:计算列中每个组的百分比变化 [英] Pyspark Column Transformation: Calculate Percentage Change for Each Group in a Column

查看:75
本文介绍了Pyspark 列转换:计算列中每个组的百分比变化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在本地机器上使用 Pyspark.我有一个包含 450 万行和大约 30,000 种不同股票的 spark 数据框.我需要计算每只股票随时间的百分比变化.我已经运行 orderBy 以便将所有股票组合在一起(如下例所示).

I'm using Pyspark on my local machine. I have a spark dataframe with 4.5 million rows and approximately 30,000 different stocks. I need to calculate the percentage change for each stock over time. I've already ran orderBy so that all the stocks are grouped together (as shown in the example below).

下面是一个简化的示例数据框.

A simplified example dataframe is below.

df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
df.show()

**Company**     **Price**
Company_A         100
Company_A         103
Company_A         105
Company_A         107
Company_B          23
Company_B          25
Company_B          28
Company_B          30

我想要的输出是这样的

**Company**     **Price**     **%_Change**
Company_A         100              0
Company_A         103              3%
Company_A         105              2%
Company_A         107              2%
Company_B          23              0
Company_B          25              9%
Company_B          28              12%
Company_B          30              7%

诀窍(在我看来)是设置一个可以做两件事的代码:1) 每次新股票上市时进行识别2) 开始计算该股票第二次观察的百分比变化,并继续计算百分比变化直到最后一次观察.它需要从第二次观察开始,因为在第二次观察发生之前不可能有百分比变化.

The trick (in my opinion) is setting up a code that can do two things: 1) identify each time a new stock is listed 2) start calculating percentage change on the second observation for that stock and continue calculating the percentage change until the last observation. It needs to start on the second observation since there can't be a percentage change until the second observation occurs.

推荐答案

您可以使用 window 操作来实现这一点,理想情况下,您将拥有带有 id 的列时间戳进行排序.例如,我使用 company 作为排序键.

You can achieve this using window operation, ideally you would have column with id or timestamp to sort. For the sake of example, I am using company as sorting key.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
price_window = Window.partitionBy("company").orderBy("company")
df = df.withColumn("prev_value", F.lag(df.price).over(price_window))
df = df.withColumn("diff", F.when(F.isnull(df.price - df.prev_value), 0).otherwise(df.price - df.prev_value))

+---------+-----+----------+----+
|  company|price|prev_value|diff|
+---------+-----+----------+----+
|Company_B|   23|      null|   0|
|Company_B|   25|        23|   2|
|Company_B|   28|        25|   3|
|Company_B|   30|        28|   2|
|Company_A|  100|      null|   0|
|Company_A|  103|       100|   3|
|Company_A|  105|       103|   2|
|Company_A|  107|       105|   2|
+---------+-----+----------+----+

这篇关于Pyspark 列转换:计算列中每个组的百分比变化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆