将平均值替换为缺失值-Spark Dataframe [英] Replace missing values with mean - Spark Dataframe
问题描述
我有一个Spark数据框,其中缺少一些值.我想通过将缺失值替换为该列的均值来执行简单的估算.我对Spark非常陌生,因此我一直在努力实现这种逻辑.到目前为止,这是我设法做到的:
I have a Spark Dataframe with some missing values. I would like to perform a simple imputation by replacing the missing values with the mean for that column. I am very new to Spark, so I have been struggling to implement this logic. This is what I have managed to do so far:
a)要对单个列(例如Col A)执行此操作,此代码行似乎有效:
a) To do this for a single column (let's say Col A), this line of code seems to work:
df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
.first()(0).asInstanceOf[Double])
.otherwise($"ColA"))
b)但是,我还无法弄清楚如何对数据框中的所有列执行此操作.我正在尝试使用Map函数,但我相信它会遍历数据帧的每一行
b) However, I have not been able to figure out, how to do this for all the columns in my dataframe. I was trying out the Map function, but I believe it loops through each row of a dataframe
c)SO上也有类似的问题-这里.虽然我喜欢该解决方案(使用聚合表和合并),但我非常渴望知道是否有一种方法可以遍历每一列(我来自R,因此请使用更高阶的函数遍历每一列,例如对我来说,快乐似乎更自然).
c) There is a similar question on SO - here. And while I liked the solution (using Aggregated tables and coalesce), I was very keen to know if there is a way to do this by looping through each column (I come from R, so looping through each column using a higher order functional like lapply seems more natural to me).
谢谢!
推荐答案
火花> = 2.2
您可以使用org.apache.spark.ml.feature.Imputer
(它支持均值和中值策略).
You can use org.apache.spark.ml.feature.Imputer
(which supports both mean and median strategy).
斯卡拉:
import org.apache.spark.ml.feature.Imputer
val imputer = new Imputer()
.setInputCols(df.columns)
.setOutputCols(df.columns.map(c => s"${c}_imputed"))
.setStrategy("mean")
imputer.fit(df).transform(df)
Python :
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=df.columns,
outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)
火花< 2.2
您在这里:
import org.apache.spark.sql.functions.mean
df.na.fill(df.columns.zip(
df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)
其中
df.columns.map(mean(_)): Array[Column]
计算每列的平均值,
df.select(_: *).first.toSeq: Seq[Any]
收集汇总值并将行转换为Seq[Any]
(我知道它不是最优的,但这是我们必须使用的API),
collects aggregated values and converts row to Seq[Any]
(I know it is suboptimal but this is the API we have to work with),
df.columns.zip(_).toMap: Map[String,Any]
创建aMap: Map[String, Any]
,将其从列名映射到其平均值,最后:
creates aMap: Map[String, Any]
which maps from the column name to its average, and finally:
df.na.fill(_): DataFrame
使用以下方法填充缺少的值:
fills the missing values using:
fill: Map[String, Any] => DataFrame
来自DataFrameNaFunctions
.
要输入NaN
条目,您可以替换:
To ingore NaN
entries you can replace:
df.select(df.columns.map(mean(_)): _*).first.toSeq
具有:
import org.apache.spark.sql.functions.{col, isnan, when}
df.select(df.columns.map(
c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
这篇关于将平均值替换为缺失值-Spark Dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!