用平均值替换缺失值 - Spark Dataframe [英] Replace missing values with mean - Spark Dataframe

查看:41
本文介绍了用平均值替换缺失值 - Spark Dataframe的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含一些缺失值的 Spark 数据框.我想通过用该列的平均值替换缺失值来执行简单的插补.我对 Spark 很陌生,所以我一直在努力实现这个逻辑.到目前为止,这是我设法做到的:

I have a Spark Dataframe with some missing values. I would like to perform a simple imputation by replacing the missing values with the mean for that column. I am very new to Spark, so I have been struggling to implement this logic. This is what I have managed to do so far:

a) 要为单个列(假设 Col A)执行此操作,这行代码似乎有效:

a) To do this for a single column (let's say Col A), this line of code seems to work:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

b) 但是,我无法弄清楚如何对数据框中的所有列执行此操作.我正在尝试 Map 函数,但我相信它会遍历数据帧的每一行

b) However, I have not been able to figure out, how to do this for all the columns in my dataframe. I was trying out the Map function, but I believe it loops through each row of a dataframe

c) SO 上有一个类似的问题 - 这里.虽然我喜欢这个解决方案(使用聚合表和合并),但我非常想知道是否有办法通过循环遍历每一列来做到这一点(我来自 R,所以使用更高阶的函数循环遍历每一列,如lapply 对我来说似乎更自然).

c) There is a similar question on SO - here. And while I liked the solution (using Aggregated tables and coalesce), I was very keen to know if there is a way to do this by looping through each column (I come from R, so looping through each column using a higher order functional like lapply seems more natural to me).

谢谢!

推荐答案

Spark >= 2.2

您可以使用 org.apache.spark.ml.feature.Imputer(支持均值和中值策略).

You can use org.apache.spark.ml.feature.Imputer (which supports both mean and median strategy).

Scala:

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setInputCols(df.columns)
  .setOutputCols(df.columns.map(c => s"${c}_imputed"))
  .setStrategy("mean")

imputer.fit(df).transform(df)

Python:

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

火花<2.2

给你:

import org.apache.spark.sql.functions.mean

df.na.fill(df.columns.zip(
  df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)

哪里

df.columns.map(mean(_)): Array[Column] 

计算每一列的平均值,

df.select(_: *).first.toSeq: Seq[Any]

收集聚合值并将行转换为 Seq[Any](我知道这是次优的,但这是我们必须使用的 API),

collects aggregated values and converts row to Seq[Any] (I know it is suboptimal but this is the API we have to work with),

df.columns.zip(_).toMap: Map[String,Any] 

创建aMap: Map[String, Any],从列名映射到其平均值,最后:

creates aMap: Map[String, Any] which maps from the column name to its average, and finally:

df.na.fill(_): DataFrame

使用以下方法填充缺失值:

fills the missing values using:

fill: Map[String, Any] => DataFrame 

来自 DataFrameNaFunctions.

要输入 NaN 条目,您可以替换:

To ingore NaN entries you can replace:

df.select(df.columns.map(mean(_)): _*).first.toSeq

与:

import org.apache.spark.sql.functions.{col, isnan, when}


df.select(df.columns.map(
  c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq

这篇关于用平均值替换缺失值 - Spark Dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆