通过处理null来执行Spark Scala按行平均 [英] Spark Scala row-wise average by handling null

查看:82
本文介绍了通过处理null来执行Spark Scala按行平均的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据量很大的数据帧,列数为"n".

I've a dataframe with high volume of data and "n" number of columns.

df_avg_calc: org.apache.spark.sql.DataFrame = [col1: double, col2: double ... 4 more fields]
+------------------+-----------------+------------------+-----------------+-----+-----+
|              col1|             col2|              col3|             col4| col5| col6|
+------------------+-----------------+------------------+-----------------+-----+-----+
|              null|             null|              null|             null| null| null|
|              14.0|              5.0|              73.0|             null| null| null|
|              null|             null|             28.25|             null| null| null|
|              null|             null|              null|             null| null| null|
|33.723333333333336|59.78999999999999|39.474999999999994|82.09666666666666|101.0|53.43|
|             26.25|             null|              null|              2.0| null| null|
|              null|             null|              null|             null| null| null|
|             54.46|           89.475|              null|             null| null| null|
|              null|            12.39|              null|             null| null| null|
|              null|             58.0|             19.45|              1.0| 1.33|158.0|
+------------------+-----------------+------------------+-----------------+-----+-----+

我需要记住要进行行平均,不要考虑将"null"的单元用于平均.

I need to perform rowwise average keeping in mind not to consider the cell with "null" for averaging.

这需要在Spark/Scala中实现.我试图解释与附件中的图像相同

This needs to be implemented in Spark / Scala. I've tried to explain the same as in the attached image

到目前为止,我已经尝试过:

What I have tried so far :

通过引用-计算行均值,而忽略Spark Scala中的NA

val df = df_raw.schema.fieldNames.filter(f => f.contains("colname")) 
val rowMeans = df_raw.select(df.map(f => col(f)).reduce(+) / lit(df.length) as "row_mean") 

df_raw包含需要汇总的列(当然是rowise).有超过80列.它们任意具有数据且为null,在计算平均值时,分母中的Null计数需要忽略.当所有列都包含数据时,即使列中的单个Null都返回Null,它也可以正常工作

Where df_raw contains columns which needs to be aggregated (of course rowise). There are more than 80 columns. Arbitrarily they have data and null, count of Null needs to be ignored in the denominator while calculating average. It works fine, when all the column contain data, even a single Null in a column returns Null

修改:

我尝试通过

推荐答案

火花> = 2.4

可以使用aggregate:

val row_mean = expr("""aggregate(
  CAST(array(_1, _2, _3) AS array<double>), 
  -- Initial value
  -- Note that aggregate is picky about types
  CAST((0.0 as sum, 0.0 as n) AS struct<sum: double, n: double>), 
  -- Merge function
  (acc, x) -> (
    acc.sum + coalesce(x, 0.0), 
    acc.n + CASE WHEN x IS NULL THEN 0.0 ELSE 1.0 END), 
  -- Finalize function
  acc -> acc.sum / acc.n)""")

用法:

df.withColumn("row_mean", row_mean).show

结果:

+----+----+----+--------+
|  _1|  _2|  _3|row_mean|
+----+----+----+--------+
|null|null|null|    null|
| 2.0|null|null|     2.0|
|50.0|34.0|null|    42.0|
| 1.0| 2.0| 3.0|     2.0|
+----+----+----+--------+

版本无关

计算NOT NULL列的总和和计数,然后将它们除以另一列:

Compute sum and count of NOT NULL columns and divide one over another:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def row_mean(cols: Column*) = {
  // Sum of values ignoring nulls
  val sum = cols
    .map(c => coalesce(c, lit(0)))
    .foldLeft(lit(0))(_ + _)
  // Count of not null values
  val cnt = cols
    .map(c => when(c.isNull, 0).otherwise(1))
    .foldLeft(lit(0))(_ + _)
  sum / cnt
}

示例数据:

val df = Seq(
  (None, None, None), 
  (Some(2.0), None, None),
  (Some(50.0), Some(34.0), None),
  (Some(1.0), Some(2.0), Some(3.0))
).toDF

结果:

df.withColumn("row_mean", row_mean($"_1", $"_2", $"_3")).show

+----+----+----+--------+
|  _1|  _2|  _3|row_mean|
+----+----+----+--------+
|null|null|null|    null|
| 2.0|null|null|     2.0|
|50.0|34.0|null|    42.0|
| 1.0| 2.0| 3.0|     2.0|
+----+----+----+--------+

这篇关于通过处理null来执行Spark Scala按行平均的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆