Spark 2.2 Scala DataFrame从字符串数组中选择,捕获错误 [英] Spark 2.2 Scala DataFrame select from string array, catching errors
问题描述
我是SparkSQL/Scala的新手,我正艰难地完成几个看似简单的任务.
I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.
我正在尝试从Scala字符串数组构建一些动态SQL.我正在尝试重新键入DataFrame中的某些列,但直到运行时我才能看到DataFrame中的一组列,我才确切知道需要重新键入哪些列.所以我正在尝试这样做:
I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:
val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...) or df.selectExpr(...) // how to invoke this with vals from my string array??
typedCols最终将是一个字符串数组,其值如下:
typedCols will end up being an array of strings with values like this:
["a", "cast(b as int) b", "c"]
我需要首先从该数组创建一个大逗号分隔的字符串吗?
Do I need to create a big comma delimited string first from that array?
因此,假设这将起作用,我将调用该select语句,它将把我的DataFrame转换为具有所需类型的新DataFrame.但是,DataFrame中的某些记录将出现错误,并且将使尝试重新键入失败.
So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.
我该如何通过所有通过键入的良好记录来获得DataFrame结果,然后将所有不良记录丢入某种错误存储区中?在尝试选择DataFrame之前,我需要先进行一次验证通过吗?
How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?
推荐答案
您可以使用可变参数:
val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show
+---+----+---+
| a| b| c|
+---+----+---+
| a| 1| c|
|foo|null|baz|
+---+----+---+
但我个人更喜欢以下列:
but personally I prefer columns:
val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show
我将如何获得通过了键入的所有良好记录的DataFrame结果,然后将所有不良记录扔到某种错误存储区中?
How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?
未能通过cast
的数据是NULL
.要查找良好的记录,请使用na.drop
:
Data that failed to cast
is NULL
. To find good records use na.drop
:
val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()
要查找错误的检查项是否为NULL
To find bad check if any is NULL
import org.apache.spark.sql.functions.col
val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))
要获取不匹配的数据:
-
如果
typedCols
是Seq[Column]
,则可以
df.where(typedCols.map(_.isNull).reduce(_ || _))
如果typedCols
是Seq[String]
,则可以:
If typedCols
are Seq[String]
you can:
import org.apache.spark.sql.functions.expr
df.where(typedCols.map(expr(_).isNull).reduce(_ || _))
这篇关于Spark 2.2 Scala DataFrame从字符串数组中选择,捕获错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!