使用 spark 数据框进行现场数据验证 [英] Field data validation using spark dataframe

查看:24
本文介绍了使用 spark 数据框进行现场数据验证的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆列,例如我的数据显示如下所示.我需要检查列是否有错误,并且必须生成两个输出文件.我正在使用 Apache Spark 2.0,我想以一种有效的方式做到这一点.

I have a bunch of columns, sample like my data displayed as show below. I need to check the columns for errors and will have to generate two output files. I'm using Apache Spark 2.0 and I would like to do this in a efficient way.

Schema Details
---------------
EMPID - (NUMBER)
ENAME - (STRING,SIZE(50))
GENDER - (STRING,SIZE(1))

Data
----
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,MM
1015,123MYA,F

我的预期输出文件应如下所示:

My excepected output files should be as shown below:

1.
EMPID,ENAME,GENDER
1001,RIO,M
1010,RICK,NULL
1015,NULL,F

2.
EMPID,ERROR_COLUMN,ERROR_VALUE,ERROR_DESCRIPTION
1010,GENDER,"MM","OVERSIZED"
1010,GENDER,"MM","VALUE INVALID FOR GENDER"
1015,ENAME,"123MYA","NAME SHOULD BE A STRING"

谢谢

推荐答案

我还没有真正使用过 Spark 2.0,所以我将尝试使用 Spark 1.6 中的解决方案来回答您的问题.

I have not really worked with Spark 2.0, so I'll try answering your question with a solution in Spark 1.6.

 // Load you base data
val input  = <<you input dataframe>>

//Extract the schema of your base data
val originalSchema = input.schema

// Modify you existing schema with you additional metadata fields
val modifiedSchema= originalSchema.add("ERROR_COLUMN", StringType, true)
                                  .add("ERROR_VALUE", StringType, true)
                                  .add("ERROR_DESCRIPTION", StringType, true)

// write a custom validation function                                 
def validateColumns(row: Row): Row = {

var err_col: String = null
var err_val: String = null
var err_desc: String = null
val empId = row.getAs[String]("EMPID")
val ename = row.getAs[String]("ENAME")
val gender = row.getAs[String]("GENDER")

// do checking here and populate (err_col,err_val,err_desc) with values if applicable

Row.merge(row, Row(err_col),Row(err_val),Row(err_desc))
}

// Call you custom validation function
val validateDF = input.map { row => validateColumns(row) }  

// Reconstruct the DataFrame with additional columns                      
val checkedDf = sqlContext.createDataFrame(validateDF, newSchema)

// Filter out row having errors
val errorDf = checkedDf.filter($"ERROR_COLUMN".isNotNull && $"ERROR_VALUE".isNotNull && $"ERROR_DESCRIPTION".isNotNull)

// Filter our row having no errors
val errorFreeDf = checkedDf.filter($"ERROR_COLUMN".isNull && !$"ERROR_VALUE".isNull && !$"ERROR_DESCRIPTION".isNull)

我个人使用过这种方法,它对我有用.我希望它为您指明了正确的方向.

I have used this approach personally and it works for me. I hope it points you in the right direction.

这篇关于使用 spark 数据框进行现场数据验证的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆