Spark 2.2 Scala DataFrame从字符串数组中选择,捕获错误 [英] Spark 2.2 Scala DataFrame select from string array, catching errors

查看:105
本文介绍了Spark 2.2 Scala DataFrame从字符串数组中选择,捕获错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是SparkSQL/Scala的新手,我正艰难地完成几个看似简单的任务.

I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.

我正在尝试从Scala字符串数组构建一些动态SQL.我正在尝试重新键入DataFrame中的某些列,但直到运行时我才能看到DataFrame中的一组列,我才确切知道需要重新键入哪些列.所以我正在尝试这样做:

I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:

val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...)  or df.selectExpr(...) // how to invoke this with vals from my string array??

typedCols最终将是一个字符串数组,其值如下:

typedCols will end up being an array of strings with values like this:

["a", "cast(b as int) b", "c"]

我需要首先从该数组创建一个大逗号分隔的字符串吗?

Do I need to create a big comma delimited string first from that array?

因此,假设这将起作用,我将调用该select语句,它将把我的DataFrame转换为具有所需类型的新DataFrame.但是,DataFrame中的某些记录将出现错误,并且将使尝试重新键入失败.

So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.

我该如何通过所有通过键入的良好记录来获得DataFrame结果,然后将所有不良记录丢入某种错误存储区中?在尝试选择DataFrame之前,我需要先进行一次验证通过吗?

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?

推荐答案

您可以使用可变参数:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show

+---+----+---+
|  a|   b|  c|
+---+----+---+
|  a|   1|  c|
|foo|null|baz|
+---+----+---+

但我个人更喜欢以下列:

but personally I prefer columns:

val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show

我将如何获得通过了键入的所有良好记录的DataFrame结果,然后将所有不良记录扔到某种错误存储区中?

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?

未能通过cast的数据是NULL.要查找良好的记录,请使用na.drop:

Data that failed to cast is NULL. To find good records use na.drop:

val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()

要查找错误的检查项是否为NULL

To find bad check if any is NULL

import org.apache.spark.sql.functions.col

val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))

要获取不匹配的数据:

  • 如果typedColsSeq[Column],则可以

df.where(typedCols.map(_.isNull).reduce(_ || _))  

  • 如果typedColsSeq[String],则可以:

  • If typedCols are Seq[String] you can:

    import org.apache.spark.sql.functions.expr
    
    df.where(typedCols.map(expr(_).isNull).reduce(_ || _))  
    

  • 这篇关于Spark 2.2 Scala DataFrame从字符串数组中选择,捕获错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆