使用 Scala 以优化的方式将 Spark 数据帧的 Not null 值替换为“1" [英] Replace Not null values of Spark dataframe as “1” using Scala in optimized way
问题描述
我有一个名为 freq
的输入 spark-dataframe
as,
I have an input spark-dataframe
named freq
as,
+---------------+----+----+----+----+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+----+----+----+----+
| 101|null| 2| 1|null|
| 102| 2|null| 2| 4|
| 103| 1| 2|null| 3|
| 104| 2|null| 3|null|
+---------------+----+----+----+----+
如果整个 dataframe
的值是 not null
,我需要将该值替换为 1
.我已经使用 Scala
像
If the values of the whole dataframe
is not null
, I need to replace that value as 1
. I have done this using Scala
like
val cols = freq.columns.drop(1).toArray
var newfreq = freq
for (column <- cols) {
newfreq = newfreq.withColumn(column, when(col(column).isNotNull, 1).otherwise(col(column)))
}
我得到了名为 newfreq
的结果 dataframe
为,
And I got the resultant dataframe
named newfreq
as,
+---------------+----+----+----+----+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+----+----+----+----+
| 101|null| 1| 1|null|
| 102| 1|null| 1| 1|
| 103| 1| 1|null| 1|
| 104| 1|null| 1|null|
+---------------+----+----+----+----+
但是有没有办法用优化的解决方案替换这个for循环
?
But is there any way to replace this for loop
with optimized solution?
推荐答案
这是一种更优化的方法:
Here is one more optimized way to do that :
import org.apache.spark.sql.functions._
val cols = freq.columns.drop(1).toSeq
val selections = Seq(col("id")) ++ cols.map(c => when(col(c).isNotNull, lit(1)).otherwise(col(c)).alias(c))
val freq2 = freq.select(selections : _*)
freq2.show
// +---+----+----+----+----+
// | id| a1| a2| a3| a4|
// +---+----+----+----+----+
// |101|null| 1| 1|null|
// |102| 1|null| 1| 1|
// |103| 1| 1|null| 1|
// |104| 1|null| 1|null|
// +---+----+----+----+----+
您可以尝试比较两者的执行计划:
You can try to compare the execution plans for both :
scala> newfreq.explain(true)
== Parsed Logical Plan ==
'Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#38]
+- AnalysisBarrier
+- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
+- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
+- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
+- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
+- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>
scala> freq2.explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('id, None), CASE WHEN isnotnull('a1) THEN 1 ELSE 'a1 END AS a1#46, CASE WHEN isnotnull('a2) THEN 1 ELSE 'a2 END AS a2#47, CASE WHEN isnotnull('a3) THEN 1 ELSE 'a3 END AS a3#48, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#49]
+- AnalysisBarrier
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv
== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>
优化的逻辑计划对两者都是一样的,但这是一种更简洁的方法.
The optimized logical plans are the same for both but this is a cleaner way to do.
这篇关于使用 Scala 以优化的方式将 Spark 数据帧的 Not null 值替换为“1"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!