在Spark SQL中对某些列应用验证 [英] Apply validation with some columns in Spark SQL
本文介绍了在Spark SQL中对某些列应用验证的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
验证数据框:
+---------+---------------------------+-------------------------+
|dataframe|Validation Checks |cols |
+---------+---------------------------+-------------------------+
|Attendee |isEmpty,IsNull |col1,col2,col3 |
+---------+---------------------------+-------------------------+
与会者数据帧:
col1 col2 col3
a1 a2 a3
b2 b3
c1 c2 c3
d1 d2 d3
预期结果数据框:
col1 col2 col3 status
a1 a2 a3 clean
b2 b3 dirty
c1 c2 c3 clean
d1 d2 d3 clean
使用的代码:
var columns = df.columns //struct(df.columns map col: _*)
val colDF = df.select(col("dataframe"))
var tablename = colDF.head().toSeq
val checkDF = df.select(col("Validation Checks"))
val opsColDF = df.select(col("cols"))
val opsColumn = opsColDF.columns println("opsColumn :::" + opsColumn)
推荐答案
如果您有数据帧
作为
+---------+-----------------+--------------+
|dataframe|Validation Checks|cols |
+---------+-----------------+--------------+
|Attendee |isEmpty,isNull |col1,col2,col3|
+---------+-----------------+--------------+
您应该使用列值进行sql查询。我使用 udf
函数创建了另一列进行有效查询
You should make sql query using the column values. I have created another column using udf
function making valid query
import org.apache.spark.sql.functions._
def createQueryUdf = udf((table: String, logic: String, cols: String) => {
"select *, case when "+
cols.split(",")
.map(_.trim)
.map(x => logic.split(",")
.map(_.trim.toLowerCase)
.map{
case y if (y == "isempty") => s"$x like ''"
case y => s"$y($x)"
}.mkString(" or "))
.mkString(" or ") +
s" then 'dirty' else 'clean' end as status from $table"
})
val dataframeWithQuery = df.withColumn("query", createQueryUdf(col("dataframe"), col("Validation Checks"), col("cols")))
所以 dataframeWithQuery
将是
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|dataframe|Validation Checks|cols |query |
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Attendee |isEmpty,isNull |col1,col2,col3|select *, case when col1 like '' or isnull(col1) or col2 like '' or isnull(col2) or col3 like '' or isnull(col3) then 'dirty' else 'clean' end as status from Attendee|
+---------+-----------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
现在您可以选择有效的查询以击中数据框,但在此之前数据框全部注册为
Now you can select the valid query to hit on the dataframes but before that the dataframes are all registered as
attendee.createOrReplaceTempView("Attendee")
然后您就可以收集
查询列和循环以应用查询语句
Then you can just collect
the query column and loop to apply the query statements
val queryArray = dataframeWithQuery.select("query").collect.map(_.getAs[String]("query"))
for(query <- queryArray){
spark.sql(query).show(false)
}
+----+----+----+------+
|col1|col2|col3|status|
+----+----+----+------+
|a1 |a2 |a3 |clean |
| |b2 |b3 |dirty |
|c1 |c2 |c3 |clean |
|d1 |d2 |d3 |clean |
+----+----+----+------+
现在,您应该对如何进一步进行了解。希望答案对您有帮助
By now you should have idea on how to proceed further. I hope the answer is helpful
这篇关于在Spark SQL中对某些列应用验证的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文