Spark 2.2 Scala DataFrame 从字符串数组中选择,捕获错误 [英] Spark 2.2 Scala DataFrame select from string array, catching errors

查看:24
本文介绍了Spark 2.2 Scala DataFrame 从字符串数组中选择,捕获错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 SparkSQL/Scala 的新手,我正在努力处理一些看似简单的任务.

我正在尝试从 Scala 字符串数组构建一些动态 SQL.我正在尝试在我的 DataFrame 中重新键入一些列,但直到运行时我才知道需要重新键入哪些列,在那里我可以看到 DataFrame 中的列集.所以我正在尝试这样做:

val cols = df.columnsval typedCols = cols.map( c => getTypedColumn(c) )df.select( ...) 或 df.selectExpr(...)//如何使用字符串数组中的 val 调用它?

typedCols 最终将是一个字符串数组,其值如下:

["a", "cast(b as int) b", "c"]

我是否需要先从该数组中创建一个大的逗号分隔字符串?

因此,假设这可行,我将调用该 select 语句,它将我的 DataFrame 转换为具有我想要的类型的新 DataFrame.但是 DataFrame 中的一些记录会出错,并且会导致尝试重新输入失败.

我如何获得所有通过输入的良好记录的 DataFrame 结果,然后将所有不良记录扔到某种错误桶中?在尝试选择 DataFrame 之前,我需要先通过验证吗?

解决方案

你可以只使用可变参数:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b",C")val typedCols = Array("a", "cast(b as int) b", "c")df.selectExpr(typedCols: _*).show+---+----+---+|一个|乙|| |+---+----+---+|一个|1|| ||foo|null|baz|+---+----+---+

但我个人更喜欢列:

val typedCols = Array($"a", $"b" cast "int", $"c")df.select(typedCols: _*).show

<块引用>

我如何获得所有通过输入的良好记录的 DataFrame 结果,然后将所有不良记录扔到某种错误桶中?

未能cast 的数据是NULL.要找到好的记录,请使用 na.drop:

val 结果 = df.selectExpr(typedCols: _*)val good = result.na.drop()

查找错误检查是否有NULL

import org.apache.spark.sql.functions.colval bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))

获取不匹配的数据:

  • 如果 typedColsSeq[Column] 你可以

    df.where(typedCols.map(_.isNull).reduce(_ || _))

  • 如果 typedColsSeq[String] 你可以:

    import org.apache.spark.sql.functions.exprdf.where(typedCols.map(expr(_).isNull).reduce(_ || _))

I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.

I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:

val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...)  or df.selectExpr(...) // how to invoke this with vals from my string array??

typedCols will end up being an array of strings with values like this:

["a", "cast(b as int) b", "c"]

Do I need to create a big comma delimited string first from that array?

So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?

解决方案

You can just use variadic arguments:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show

+---+----+---+
|  a|   b|  c|
+---+----+---+
|  a|   1|  c|
|foo|null|baz|
+---+----+---+

but personally I prefer columns:

val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?

Data that failed to cast is NULL. To find good records use na.drop:

val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()

To find bad check if any is NULL

import org.apache.spark.sql.functions.col

val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))

To get unmatched data:

  • If typedCols are Seq[Column] you can

    df.where(typedCols.map(_.isNull).reduce(_ || _))  
    

  • If typedCols are Seq[String] you can:

    import org.apache.spark.sql.functions.expr
    
    df.where(typedCols.map(expr(_).isNull).reduce(_ || _))  
    

这篇关于Spark 2.2 Scala DataFrame 从字符串数组中选择,捕获错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆