Pyspark 列转换:计算列中每个组的百分比变化 [英] Pyspark Column Transformation: Calculate Percentage Change for Each Group in a Column
问题描述
我在本地机器上使用 Pyspark.我有一个包含 450 万行和大约 30,000 种不同股票的 spark 数据框.我需要计算每只股票随时间的百分比变化.我已经运行 orderBy 以便将所有股票组合在一起(如下例所示).
I'm using Pyspark on my local machine. I have a spark dataframe with 4.5 million rows and approximately 30,000 different stocks. I need to calculate the percentage change for each stock over time. I've already ran orderBy so that all the stocks are grouped together (as shown in the example below).
下面是一个简化的示例数据框.
A simplified example dataframe is below.
df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
df.show()
**Company** **Price**
Company_A 100
Company_A 103
Company_A 105
Company_A 107
Company_B 23
Company_B 25
Company_B 28
Company_B 30
我想要的输出是这样的
**Company** **Price** **%_Change**
Company_A 100 0
Company_A 103 3%
Company_A 105 2%
Company_A 107 2%
Company_B 23 0
Company_B 25 9%
Company_B 28 12%
Company_B 30 7%
诀窍(在我看来)是设置一个可以做两件事的代码:1) 每次新股票上市时进行识别2) 开始计算该股票第二次观察的百分比变化,并继续计算百分比变化直到最后一次观察.它需要从第二次观察开始,因为在第二次观察发生之前不可能有百分比变化.
The trick (in my opinion) is setting up a code that can do two things: 1) identify each time a new stock is listed 2) start calculating percentage change on the second observation for that stock and continue calculating the percentage change until the last observation. It needs to start on the second observation since there can't be a percentage change until the second observation occurs.
推荐答案
您可以使用 window
操作来实现这一点,理想情况下,您将拥有带有 id
或 的列时间戳
进行排序.例如,我使用 company
作为排序键.
You can achieve this using window
operation, ideally you would have column with id
or timestamp
to sort. For the sake of example, I am using company
as sorting key.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
price_window = Window.partitionBy("company").orderBy("company")
df = df.withColumn("prev_value", F.lag(df.price).over(price_window))
df = df.withColumn("diff", F.when(F.isnull(df.price - df.prev_value), 0).otherwise(df.price - df.prev_value))
+---------+-----+----------+----+
| company|price|prev_value|diff|
+---------+-----+----------+----+
|Company_B| 23| null| 0|
|Company_B| 25| 23| 2|
|Company_B| 28| 25| 3|
|Company_B| 30| 28| 2|
|Company_A| 100| null| 0|
|Company_A| 103| 100| 3|
|Company_A| 105| 103| 2|
|Company_A| 107| 105| 2|
+---------+-----+----------+----+
这篇关于Pyspark 列转换:计算列中每个组的百分比变化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!