如何处理 spark reduceByKey 函数中的空值? [英] How to deal with null values in spark reduceByKey function?

查看:24
本文介绍了如何处理 spark reduceByKey 函数中的空值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下所示的 spark DataFrame (df):

I have a spark DataFrame (df) which looks like this:

+----------+--------+----------+--------+                                                                 
|        c1|      c2|        c3|      c4|
+----------+--------+----------+--------+
|      1   |    5   |      null|    7   |
+----------+--------+----------+--------+
|      1   |    5   |      4   |    8   |
+----------+--------+----------+--------+
|      1   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      1   |    3   |      null| null   |
+----------+--------+----------+--------+
|      2   |    6   |      23  |   17   |
+----------+--------+----------+--------+
|      2   |    6   |      7   |    3   |
+----------+--------+----------+--------+
|      2   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      2   |    3   |      null|   17   |
+----------+--------+----------+--------+

我想使用 (c1,c2) 作为键进行聚合,并具有 c3c4average,所以我有这个:

I want to aggregate using (c1,c2) as key and have average of c3 and c4, so that I have this:

+----------+--------+----------+--------+                                                                 
|        c1|      c2|        c3|      c4|
+----------+--------+----------+--------+
|      1   |    5   |      4   |  7.5   |
+----------+--------+----------+--------+
|      1   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      2   |    6   |      15  |    10  |
+----------+--------+----------+--------+
|      2   |    3   |      null|   14   |
+----------+--------+----------+--------+

所以,基本上我忽略了 null 值.

So, essentially I am ignoring the null values.

我的半成品代码如下所示:

val df1 = df.
          // just working on c3 for time being
          map(x => ((x.getInt(0), x.getInt(1)), x.getDouble(3))).
          reduceByKey( 
            (x, y) => {
            var temp = 0
            var sum = 0.0
            var flag = false
            if (x == null) {
              if (y != null) {
                temp = temp + 1
                sum = y
                flag = true
              }
            } else {
              if (y == null) {
                temp = temp + 1
                sum = x 
              } else {
                temp = temp + 1
                sum = x + y
                flag = true
              } 
            } 
            if (flag == false) {
              null 
            } else {
              sum/temp 
            }
            }
          )

显然,上面的代码不起作用.非常感谢您对代码工作的任何帮助.

Obviously, the above code is not working. Any help to make the code work is very much appreciated.

编辑 1@zero232 给出的答案是一个解决方案.但是,这不是我正在寻找的解决方案".我的兴趣是了解在为 reduceByKey() 编写自定义函数时如何处理空值.我正在重新提出以下问题:

Edit 1 The answer given by @zero232 is a solution. However, it is not "the solution" I am looking for. My interest was to understand how to deal with null values when writing a custom function for reduceByKey(). I am re-asking the question below:

我想使用 (c1,c2) 作为键进行聚合并具有 root mean square [{sum(a_i^2)}^0.5] (或某些函数这在 spark 中不可用)的 c3c4 同时忽略空值,所以我有这个:

I want to aggregate using (c1,c2) as key and have root mean square [{sum(a_i^2)}^0.5] (or some function which is not available in spark for that matter) of c3 and c4 while ignoring the null values, so that I have this:

+----------+--------+----------+--------+                                                                 
|        c1|      c2|        c3|      c4|
+----------+--------+----------+--------+
|      1   |    5   |      4   | 10.63  |
+----------+--------+----------+--------+
|      1   |    3   |      null|   11   |
+----------+--------+----------+--------+
|      2   |    6   |   24.04  |  17.26 |
+----------+--------+----------+--------+
|      2   |    3   |      null| 20.24  |
+----------+--------+----------+--------+

推荐答案

只需 groupBy 并使用 mean:

df.groupBy("c1", "c2").mean("c3", "c4")

agg

df.groupBy("c1", "c2").agg(avg("c3"), avg("c4"))

通常DataFrames 上的所有原始函数都会正确处理null 值.

Typically all primitive functions on DataFrames will handle correctly null values.

import org.apache.spark.sql.functions._

def rms(c: String) = sqrt(avg(pow(col(c), 2))).alias(s"rms($c)")
df.groupBy("c1", "c2").agg(rms("c3"), rms("c4"))

如果你想用 RDDs 忽略 null,只需在应用缩减之前过滤掉它们:

If you want to ignore null with RDDs just filter these out before you apply reduction:

somePairRDD.filter(_._2 != null)
  .foldByKey(someDefualtValue)(someReducingFunction)

或将值转换为 Option 并使用模式匹配:

or convert values to Option and use pattern matching:

somePairRDD.mapValues(Option(_)).reduceByKey {
  case (Some(x), Some(y)) => doSomething(x, y)
  case (Some(x), _) => doSomething(x)
  case (_, Some(_)) => doSomething(y)
  case _ => someDefualt
}

或使用map/flatMap/getOrElse 等标准工具来处理未定义的值.

or use map / flatMap / getOrElse and other standard tools to handle undefined values.

这篇关于如何处理 spark reduceByKey 函数中的空值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆