如何处理spark reduceByKey函数中的空值? [英] How to deal with null values in spark reduceByKey function?

查看:145
本文介绍了如何处理spark reduceByKey函数中的空值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark DataFrame(df),它看起来像这样:

I have a spark DataFrame (df) which looks like this:

+----------+--------+----------+--------+                                                                 
|        c1|      c2|        c3|      c4|
+----------+--------+----------+--------+
|      1   |    5   |      null|    7   |
+----------+--------+----------+--------+
|      1   |    5   |      4   |    8   |
+----------+--------+----------+--------+
|      1   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      1   |    3   |      null| null   |
+----------+--------+----------+--------+
|      2   |    6   |      23  |   17   |
+----------+--------+----------+--------+
|      2   |    6   |      7   |    3   |
+----------+--------+----------+--------+
|      2   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      2   |    3   |      null|   17   |
+----------+--------+----------+--------+

我想使用(c1,c2)作为键进行聚合,并具有c3c4average,这样我就拥有了:

I want to aggregate using (c1,c2) as key and have average of c3 and c4, so that I have this:

+----------+--------+----------+--------+                                                                 
|        c1|      c2|        c3|      c4|
+----------+--------+----------+--------+
|      1   |    5   |      4   |  7.5   |
+----------+--------+----------+--------+
|      1   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      2   |    6   |      15  |    10  |
+----------+--------+----------+--------+
|      2   |    3   |      null|   14   |
+----------+--------+----------+--------+

因此,基本上我忽略了null值.

So, essentially I am ignoring the null values.

我的半熟代码看起来像这样:

My half-baked code looks something like this:

val df1 = df.
          // just working on c3 for time being
          map(x => ((x.getInt(0), x.getInt(1)), x.getDouble(3))).
          reduceByKey( 
            (x, y) => {
            var temp = 0
            var sum = 0.0
            var flag = false
            if (x == null) {
              if (y != null) {
                temp = temp + 1
                sum = y
                flag = true
              }
            } else {
              if (y == null) {
                temp = temp + 1
                sum = x 
              } else {
                temp = temp + 1
                sum = x + y
                flag = true
              } 
            } 
            if (flag == false) {
              null 
            } else {
              sum/temp 
            }
            }
          )

显然,以上代码无法正常工作.非常感谢使代码能够正常工作的任何帮助.

Obviously, the above code is not working. Any help to make the code work is very much appreciated.

编辑1 @ zero232给出的答案是一种解决方案.但是,它不是我要寻找的解决方案".我的兴趣是了解reduceByKey() 编写自定义函数时如何处理空值. 我正在再问以下问题:

Edit 1 The answer given by @zero232 is a solution. However, it is not "the solution" I am looking for. My interest was to understand how to deal with null values when writing a custom function for reduceByKey(). I am re-asking the question below:

我想使用(c1,c2)作为键进行汇总,并使c3c4却忽略了空值,所以我有了这个:

I want to aggregate using (c1,c2) as key and have root mean square [{sum(a_i^2)}^0.5] (or some function which is not available in spark for that matter) of c3 and c4 while ignoring the null values, so that I have this:

+----------+--------+----------+--------+                                                                 
|        c1|      c2|        c3|      c4|
+----------+--------+----------+--------+
|      1   |    5   |      4   | 10.63  |
+----------+--------+----------+--------+
|      1   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      2   |    6   |   24.04  |  17.26 |
+----------+--------+----------+--------+
|      2   |    3   |      null| 20.24  |
+----------+--------+----------+--------+

推荐答案

只需groupBy并使用mean:

df.groupBy("c1", "c2").mean("c3", "c4")

agg

df.groupBy("c1", "c2").agg(avg("c3"), avg("c4"))

DataFrames上的所有基本函数通常都可以正确处理null值.

Typically all primitive functions on DataFrames will handle correctly null values.

import org.apache.spark.sql.functions._

def rms(c: String) = sqrt(avg(pow(col(c), 2))).alias(s"rms($c)")
df.groupBy("c1", "c2").agg(rms("c3"), rms("c4"))

如果要使用RDDs忽略null,只需在应用归约方法之前将其过滤掉:

If you want to ignore null with RDDs just filter these out before you apply reduction:

somePairRDD.filter(_._2 != null)
  .foldByKey(someDefualtValue)(someReducingFunction)

或将值转换为Option并使用模式匹配:

or convert values to Option and use pattern matching:

somePairRDD.mapValues(Option(_)).reduceByKey {
  case (Some(x), Some(y)) => doSomething(x, y)
  case (Some(x), _) => doSomething(x)
  case (_, Some(_)) => doSomething(y)
  case _ => someDefualt
}

或使用map/flatMap/getOrElse和其他标准工具来处理未定义的值.

or use map / flatMap / getOrElse and other standard tools to handle undefined values.

这篇关于如何处理spark reduceByKey函数中的空值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆