使用 Scala 以优化的方式将 Spark 数据帧的 Not null 值替换为“1" [英] Replace Not null values of Spark dataframe as “1” using Scala in optimized way

查看:19
本文介绍了使用 Scala 以优化的方式将 Spark 数据帧的 Not null 值替换为“1"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个名为 freq 的输入 spark-dataframe as,

I have an input spark-dataframe named freq as,

+---------------+----+----+----+----+
|Main_CustomerID|  A1|  A2|  A3|  A4|
+---------------+----+----+----+----+
|            101|null|   2|   1|null|
|            102|   2|null|   2|   4|
|            103|   1|   2|null|   3|
|            104|   2|null|   3|null|
+---------------+----+----+----+----+

如果整个 dataframe 的值是 not null,我需要将该值替换为 1.我已经使用 Scala

If the values of the whole dataframe is not null, I need to replace that value as 1. I have done this using Scala like

val cols = freq.columns.drop(1).toArray
var newfreq = freq
for (column <- cols) {
   newfreq = newfreq.withColumn(column, when(col(column).isNotNull, 1).otherwise(col(column)))
}

我得到了名为 newfreq 的结果 dataframe 为,

And I got the resultant dataframe named newfreq as,

+---------------+----+----+----+----+
|Main_CustomerID|  A1|  A2|  A3|  A4|
+---------------+----+----+----+----+
|            101|null|   1|   1|null|
|            102|   1|null|   1|   1|
|            103|   1|   1|null|   1|
|            104|   1|null|   1|null|
+---------------+----+----+----+----+

但是有没有办法用优化的解决方案替换这个for循环?

But is there any way to replace this for loop with optimized solution?

推荐答案

这是一种更优化的方法:

Here is one more optimized way to do that :

import org.apache.spark.sql.functions._

val cols = freq.columns.drop(1).toSeq
val selections = Seq(col("id")) ++ cols.map(c => when(col(c).isNotNull, lit(1)).otherwise(col(c)).alias(c))
val freq2 = freq.select(selections : _*)
freq2.show
// +---+----+----+----+----+
// | id|  a1|  a2|  a3|  a4|
// +---+----+----+----+----+
// |101|null|   1|   1|null|
// |102|   1|null|   1|   1|
// |103|   1|   1|null|   1|
// |104|   1|null|   1|null|
// +---+----+----+----+----+

您可以尝试比较两者的执行计划:

You can try to compare the execution plans for both :

scala> newfreq.explain(true)
== Parsed Logical Plan ==
'Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#38]
+- AnalysisBarrier
      +- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
         +- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
            +- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
               +- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, a1#20, a2#26, a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Project [id#10, a1#20, a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, a4#14]
   +- Project [id#10, a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, a3#13, a4#14]
      +- Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, a2#12, a3#13, a4#14]
         +- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#20, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#26, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#32, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#38]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>

scala> freq2.explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('id, None), CASE WHEN isnotnull('a1) THEN 1 ELSE 'a1 END AS a1#46, CASE WHEN isnotnull('a2) THEN 1 ELSE 'a2 END AS a2#47, CASE WHEN isnotnull('a3) THEN 1 ELSE 'a3 END AS a3#48, CASE WHEN isnotnull('a4) THEN 1 ELSE 'a4 END AS a4#49]
+- AnalysisBarrier
      +- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Analyzed Logical Plan ==
id: int, a1: int, a2: int, a3: int, a4: int
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Optimized Logical Plan ==
Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- Relation[id#10,a1#11,a2#12,a3#13,a4#14] csv

== Physical Plan ==
*(1) Project [id#10, CASE WHEN isnotnull(a1#11) THEN 1 ELSE a1#11 END AS a1#46, CASE WHEN isnotnull(a2#12) THEN 1 ELSE a2#12 END AS a2#47, CASE WHEN isnotnull(a3#13) THEN 1 ELSE a3#13 END AS a3#48, CASE WHEN isnotnull(a4#14) THEN 1 ELSE a4#14 END AS a4#49]
+- *(1) FileScan csv [id#10,a1#11,a2#12,a3#13,a4#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:.../test.data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,a1:int,a2:int,a3:int,a4:int>

优化的逻辑计划对两者都是一样的,但这是一种更简洁的方法.

The optimized logical plans are the same for both but this is a cleaner way to do.

这篇关于使用 Scala 以优化的方式将 Spark 数据帧的 Not null 值替换为“1"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆