Spark 使用前一行的值向数据框添加新列 [英] Spark add new column to dataframe with value from previous row
问题描述
我想知道如何在 Spark (Pyspark) 中实现以下目标
I'm wondering how I can achieve the following in Spark (Pyspark)
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
结果数据框:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
我设法通过使用类似的东西通常将新列附加"到数据框:df.withColumn("new_Col", df.num * 10)
I manage to generally "append" new columns to a dataframe by using something like:
df.withColumn("new_Col", df.num * 10)
但是,我不知道如何为新列实现这种行移动",以便新列具有前一行的字段值(如示例所示).我在 API 文档中也找不到任何关于如何通过索引访问 DF 中特定行的内容.
However I have no idea on how I can achieve this "shift of rows" for the new column, so that the new column has the value of a field from the previous row (as shown in the example). I also couldn't find anything in the API documentation on how to access a certain row in a DF by index.
任何帮助将不胜感激.
推荐答案
可以使用lag
窗口函数如下
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
但是有一些重要的问题:
but there some important issues:
- 如果您需要一个全局操作(不被其他列/列分区),它的效率极低.
- 您需要一种自然的方式来排序您的数据.
虽然第二个问题几乎从来都不是问题,但第一个问题可能会破坏交易.如果是这种情况,您应该简单地将 DataFrame
转换为 RDD 并手动计算 lag
.参见示例:
While the second issue is almost never a problem the first one can be a deal-breaker. If this is the case you should simply convert your DataFrame
to RDD and compute lag
manually. See for example:
- 如何在 Pyspark 中对时间序列数据使用滑动窗口转换数据
- Apache Spark 移动平均线(用 Scala 编写,但可以针对 PySpark 进行调整.请务必先阅读评论).
- How to transform data with sliding window over time series data in Pyspark
- Apache Spark Moving Average (written in Scala, but can be adjusted for PySpark. Be sure to read the comments first).
其他有用的链接:
- https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb
- Spark 窗口函数 - 日期范围之间
这篇关于Spark 使用前一行的值向数据框添加新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!