Spark 2.2 Scala DataFrame 从字符串数组中选择,捕获错误 [英] Spark 2.2 Scala DataFrame select from string array, catching errors
问题描述
我是 SparkSQL/Scala 的新手,我正在努力处理一些看似简单的任务.
我正在尝试从 Scala 字符串数组构建一些动态 SQL.我正在尝试在我的 DataFrame 中重新键入一些列,但直到运行时我才知道需要重新键入哪些列,在那里我可以看到 DataFrame 中的列集.所以我正在尝试这样做:
val cols = df.columnsval typedCols = cols.map( c => getTypedColumn(c) )df.select( ...) 或 df.selectExpr(...)//如何使用字符串数组中的 val 调用它?
typedCols 最终将是一个字符串数组,其值如下:
["a", "cast(b as int) b", "c"]
我是否需要先从该数组中创建一个大的逗号分隔字符串?
因此,假设这可行,我将调用该 select 语句,它将我的 DataFrame 转换为具有我想要的类型的新 DataFrame.但是 DataFrame 中的一些记录会出错,并且会导致尝试重新输入失败.
我如何获得所有通过输入的良好记录的 DataFrame 结果,然后将所有不良记录扔到某种错误桶中?在尝试选择 DataFrame 之前,我需要先通过验证吗?
你可以只使用可变参数:
val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b",C")val typedCols = Array("a", "cast(b as int) b", "c")df.selectExpr(typedCols: _*).show+---+----+---+|一个|乙|| |+---+----+---+|一个|1|| ||foo|null|baz|+---+----+---+
但我个人更喜欢列:
val typedCols = Array($"a", $"b" cast "int", $"c")df.select(typedCols: _*).show
<块引用>
我如何获得所有通过输入的良好记录的 DataFrame 结果,然后将所有不良记录扔到某种错误桶中?
未能cast
的数据是NULL
.要找到好的记录,请使用 na.drop
:
val 结果 = df.selectExpr(typedCols: _*)val good = result.na.drop()
查找错误检查是否有NULL
import org.apache.spark.sql.functions.colval bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))
获取不匹配的数据:
如果
typedCols
是Seq[Column]
你可以df.where(typedCols.map(_.isNull).reduce(_ || _))
如果
typedCols
是Seq[String]
你可以:import org.apache.spark.sql.functions.exprdf.where(typedCols.map(expr(_).isNull).reduce(_ || _))
I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.
I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:
val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...) or df.selectExpr(...) // how to invoke this with vals from my string array??
typedCols will end up being an array of strings with values like this:
["a", "cast(b as int) b", "c"]
Do I need to create a big comma delimited string first from that array?
So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.
How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?
You can just use variadic arguments:
val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show
+---+----+---+
| a| b| c|
+---+----+---+
| a| 1| c|
|foo|null|baz|
+---+----+---+
but personally I prefer columns:
val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show
How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?
Data that failed to cast
is NULL
. To find good records use na.drop
:
val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()
To find bad check if any is NULL
import org.apache.spark.sql.functions.col
val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))
To get unmatched data:
If
typedCols
areSeq[Column]
you candf.where(typedCols.map(_.isNull).reduce(_ || _))
If
typedCols
areSeq[String]
you can:import org.apache.spark.sql.functions.expr df.where(typedCols.map(expr(_).isNull).reduce(_ || _))
这篇关于Spark 2.2 Scala DataFrame 从字符串数组中选择,捕获错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!