如何处理 spark reduceByKey 函数中的空值? [英] How to deal with null values in spark reduceByKey function?
问题描述
我有一个如下所示的 spark DataFrame (df):
I have a spark DataFrame (df) which looks like this:
+----------+--------+----------+--------+
| c1| c2| c3| c4|
+----------+--------+----------+--------+
| 1 | 5 | null| 7 |
+----------+--------+----------+--------+
| 1 | 5 | 4 | 8 |
+----------+--------+----------+--------+
| 1 | 3 | null| 11 |
+----------+--------+----------+--------+
| 1 | 3 | null| null |
+----------+--------+----------+--------+
| 2 | 6 | 23 | 17 |
+----------+--------+----------+--------+
| 2 | 6 | 7 | 3 |
+----------+--------+----------+--------+
| 2 | 3 | null| 11 |
+----------+--------+----------+--------+
| 2 | 3 | null| 17 |
+----------+--------+----------+--------+
我想使用 (c1,c2)
作为键进行聚合,并具有 c3
和 c4
的 average
,所以我有这个:
I want to aggregate using (c1,c2)
as key and have average
of c3
and c4
, so that I have this:
+----------+--------+----------+--------+
| c1| c2| c3| c4|
+----------+--------+----------+--------+
| 1 | 5 | 4 | 7.5 |
+----------+--------+----------+--------+
| 1 | 3 | null| 11 |
+----------+--------+----------+--------+
| 2 | 6 | 15 | 10 |
+----------+--------+----------+--------+
| 2 | 3 | null| 14 |
+----------+--------+----------+--------+
所以,基本上我忽略了 null
值.
So, essentially I am ignoring the null
values.
我的半成品代码如下所示:
val df1 = df.
// just working on c3 for time being
map(x => ((x.getInt(0), x.getInt(1)), x.getDouble(3))).
reduceByKey(
(x, y) => {
var temp = 0
var sum = 0.0
var flag = false
if (x == null) {
if (y != null) {
temp = temp + 1
sum = y
flag = true
}
} else {
if (y == null) {
temp = temp + 1
sum = x
} else {
temp = temp + 1
sum = x + y
flag = true
}
}
if (flag == false) {
null
} else {
sum/temp
}
}
)
显然,上面的代码不起作用.非常感谢您对代码工作的任何帮助.
Obviously, the above code is not working. Any help to make the code work is very much appreciated.
编辑 1@zero232 给出的答案是一个解决方案.但是,这不是我正在寻找的解决方案".我的兴趣是了解在为 reduceByKey()
编写自定义函数时如何处理空值.我正在重新提出以下问题:
Edit 1 The answer given by @zero232 is a solution. However, it is not "the solution" I am looking for. My interest was to understand how to deal with null values when writing a custom function for reduceByKey()
. I am re-asking the question below:
我想使用 (c1,c2)
作为键进行聚合并具有 root mean square
[{sum(a_i^2)}^0.5] (或某些函数这在 spark 中不可用)的 c3
和 c4
同时忽略空值,所以我有这个:
I want to aggregate using (c1,c2)
as key and have root mean square
[{sum(a_i^2)}^0.5] (or some function which is not available in spark for that matter) of c3
and c4
while ignoring the null values, so that I have this:
+----------+--------+----------+--------+
| c1| c2| c3| c4|
+----------+--------+----------+--------+
| 1 | 5 | 4 | 10.63 |
+----------+--------+----------+--------+
| 1 | 3 | null| 11 |
+----------+--------+----------+--------+
| 2 | 6 | 24.04 | 17.26 |
+----------+--------+----------+--------+
| 2 | 3 | null| 20.24 |
+----------+--------+----------+--------+
推荐答案
只需 groupBy
并使用 mean
:
df.groupBy("c1", "c2").mean("c3", "c4")
或 agg
df.groupBy("c1", "c2").agg(avg("c3"), avg("c4"))
通常DataFrames
上的所有原始函数都会正确处理null
值.
Typically all primitive functions on DataFrames
will handle correctly null
values.
import org.apache.spark.sql.functions._
def rms(c: String) = sqrt(avg(pow(col(c), 2))).alias(s"rms($c)")
df.groupBy("c1", "c2").agg(rms("c3"), rms("c4"))
如果你想用 RDDs
忽略 null
,只需在应用缩减之前过滤掉它们:
If you want to ignore null
with RDDs
just filter these out before you apply reduction:
somePairRDD.filter(_._2 != null)
.foldByKey(someDefualtValue)(someReducingFunction)
或将值转换为 Option
并使用模式匹配:
or convert values to Option
and use pattern matching:
somePairRDD.mapValues(Option(_)).reduceByKey {
case (Some(x), Some(y)) => doSomething(x, y)
case (Some(x), _) => doSomething(x)
case (_, Some(_)) => doSomething(y)
case _ => someDefualt
}
或使用map
/flatMap
/getOrElse
等标准工具来处理未定义的值.
or use map
/ flatMap
/ getOrElse
and other standard tools to handle undefined values.
这篇关于如何处理 spark reduceByKey 函数中的空值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!