遍历Spark数据框的列并更新指定的值 [英] Iterate though Columns of a Spark Dataframe and update specified values

查看:115
本文介绍了遍历Spark数据框的列并更新指定的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

要遍历从Hive表创建的Spark数据框的列并更新所有出现的所需列值,我尝试了以下代码.

To iterate through columns of a Spark Dataframe created from Hive table and update all occurrences of desired column values, I tried the following code.

import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

val a: DataFrame = spark.sql(s"select * from default.table_a")

    val column_names: Array[String] = a.columns

    val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date")) 

    val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )

    val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}

在spark shell中执行代码时,出现以下错误.

When executed the code in spark shell I got the following error.

scala> val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
<console>:35: error: value a is not a member of org.apache.spark.sql.DataFrame
       val b = {for (column: String <- required_column_list) { a.withColumn(column , isNull(a(column))) } a}
                                                                                                          ^ 

我也尝试了以下语句,但未获得所需的输出.

Also I tried the following statement and didn't get required output.

val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }

变量b被创建为一个Unit而不是Dataframe.

The variable b is created a Unit instead of Dataframe.

scala> val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
    b: Unit = ()

请提出任何更好的方法来遍历Dataframe的列并更新该列中所有出现的值,或者更正我的错误之处.任何其他解决方案也是可以理解的.预先感谢.

Please suggest any better way to iterate through the columns of Dataframe and update all occurances of values from columns or correct where I am wrong. Any other solution is also appreciated. Thanks in advance.

推荐答案

而不是 for循环,您应该使用 foldLeft .而且您不需要 udf 函数,何时可以使用内置函数

Instead of for loop, you should go with foldLeft. And you don't need a udf function, when inbuilt function can be used

val column_names: Array[String] = a.columns

val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))

import org.apache.spark.sql.functions._
val b = required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}

我希望答案会有所帮助


required_columns.foldLeft(a){(tempdf,colName)=>tempdf.withColumn(colName,when(col(colName)==="XXX" || col(colName)==="WWWW" || col(colName)==="TTTT","NULL").otherwise(col(colName)))}

required_columns 是来自 a 数据帧/数据集的列名的数组,以 _date 作为结束字符串,它们是 colName withColumn

required_columns is an array of column names from a dataframe/dataset with _date as ending string, which are the colName inside withColumn

tempdf 是原始数据框/数据集,即 a

tempdf is the original dataframe/dataset i.e. a

when function is applied inside withColumn which replaces all XXX or WWWWW or TTTT values to NULL

最终 foldLeft 将所有 transformations应用的数据帧返回到 b

这篇关于遍历Spark数据框的列并更新指定的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆