在Spark SQL中对某些列应用验证 [英] Apply validation with some columns in Spark SQL

查看:44
本文介绍了在Spark SQL中对某些列应用验证的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

验证数据框:

+---------+---------------------------+-------------------------+
|dataframe|Validation Checks          |cols                     |
+---------+---------------------------+-------------------------+
|Attendee |isEmpty,IsNull             |col1,col2,col3           |
+---------+---------------------------+-------------------------+

与会者数据帧:

    col1    col2    col3
    a1       a2     a3
             b2     b3
    c1       c2     c3
    d1       d2     d3

预期结果数据框:

    col1    col2    col3   status
    a1       a2     a3      clean
             b2     b3      dirty  
    c1       c2     c3      clean
    d1       d2     d3      clean

使用的代码:

var columns = df.columns //struct(df.columns map col: _*) 
val colDF = df.select(col("dataframe")) 
var tablename = colDF.head().toSeq 
val checkDF = df.select(col("Validation Checks")) 
val opsColDF = df.select(col("cols")) 
val opsColumn = opsColDF.columns println("opsColumn :::" + opsColumn) 


推荐答案

如果您有数据帧作为

+---------+-----------------+--------------+
|dataframe|Validation Checks|cols          |
+---------+-----------------+--------------+
|Attendee |isEmpty,isNull   |col1,col2,col3|
+---------+-----------------+--------------+

您应该使用列值进行sql查询。我使用 udf 函数创建了另一列进行有效查询

You should make sql query using the column values. I have created another column using udf function making valid query

import org.apache.spark.sql.functions._
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
  "select *, case when "+
    cols.split(",")
      .map(_.trim)
      .map(x => logic.split(",")
        .map(_.trim.toLowerCase)
        .map{
            case y if (y == "isempty") => s"$x like ''"
            case y => s"$y($x)"
        }.mkString(" or "))
      .mkString(" or ") +
  s" then 'dirty' else 'clean' end as status from $table"
})

val dataframeWithQuery = df.withColumn("query", createQueryUdf(col("dataframe"), col("Validation Checks"), col("cols")))

所以 dataframeWithQuery 将是

+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dataframe|Validation Checks|cols          |query                                                                                                                                                                 |
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Attendee |isEmpty,isNull   |col1,col2,col3|select *, case when col1 like '' or isnull(col1) or col2 like '' or isnull(col2) or col3 like '' or isnull(col3) then 'dirty' else 'clean' end as status from Attendee|
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

现在您可以选择有效的查询以击中数据框,但在此之前数据框全部注册

Now you can select the valid query to hit on the dataframes but before that the dataframes are all registered as

attendee.createOrReplaceTempView("Attendee")

然后您就可以收集 查询列循环以应用查询语句

Then you can just collect the query column and loop to apply the query statements

val queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))

for(query <- queryArray){
  spark.sql(query).show(false)
}

+----+----+----+------+
|col1|col2|col3|status|
+----+----+----+------+
|a1  |a2  |a3  |clean |
|    |b2  |b3  |dirty |
|c1  |c2  |c3  |clean |
|d1  |d2  |d3  |clean |
+----+----+----+------+

现在,您应该对如何进一步进行了解。希望答案对您有帮助

By now you should have idea on how to proceed further. I hope the answer is helpful

这篇关于在Spark SQL中对某些列应用验证的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆