具有复杂输入参数的 Spark SQL UDF [英] Spark SQL UDF with complex input parameter
问题描述
我正在尝试将 UDF 与结构的输入类型数组一起使用.我有以下数据结构,这只是更大结构的相关部分
I'm trying to use UDF with input type Array of struct. I have the following structure of data this is only relevant part of a bigger structure
|--investments: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- funding_round: struct (nullable = true)
| | | |-- company: struct (nullable = true)
| | | | |-- name: string (nullable = true)
| | | | |-- permalink: string (nullable = true)
| | | |-- funded_day: long (nullable = true)
| | | |-- funded_month: long (nullable = true)
| | | |-- funded_year: long (nullable = true)
| | | |-- raised_amount: long (nullable = true)
| | | |-- raised_currency_code: string (nullable = true)
| | | |-- round_code: string (nullable = true)
| | | |-- source_description: string (nullable = true)
| | | |-- source_url: string (nullable = true)
我声明了案例类:
case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)
UDF 声明:
sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => {
val totals = investments.map(r => r.funding_round.raised_amount)
totals.sum
})
当我执行以下转换时,结果如预期
When I'm executing the following transformation the result is as expected
scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]
但是当执行像 collect 这样的操作时我有一个错误:
But when an action executed like collect I have an error:
Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments
感谢您的帮助.
推荐答案
您看到的错误应该是不言自明的.Catalyst/SQL 类型和 Scala 类型之间有严格的映射,可以在 相关部分 Spark SQL,数据帧和数据集指南.
The error you see should be pretty much self-explanatory. There is a strict mapping between Catalyst / SQL types and Scala types which can be found in the relevant section of the Spark SQL, DataFrames and Datasets Guide.
特别是 struct
类型被转换为 oassql.Row
(在您的特定情况下,数据将公开为 Seq[Row]
).
In particular struct
types are converted to o.a.s.sql.Row
(in your particular case data will be exposed as Seq[Row]
).
可以使用不同的方法将数据公开为特定类型:
There are different methods which can be used to expose data as specific types:
- 定义 UDT(用户定义类型),其中 已在 2.0.0 中删除 并且没有替代暂时.
- 将
DataFrame
转换为Dataset[T]
,其中T
是所需的本地类型.
- Defining UDT (user defined type) which has been removed in 2.0.0 and has no replacement for now.
- Converting
DataFrame
toDataset[T]
whereT
is a desired local type.
只有前一种方法适用于这种特殊情况.
with only the former approach could be applicable in this particular scenario.
如果您想使用 UDF 访问 investments.funding_round.raised_amount
,您需要这样的东西:
If you want to access investments.funding_round.raised_amount
using UDF you'll need something like this:
val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount"))
).toOption)
但简单的select
应该更安全、更干净:
but simple select
should be much safer and cleaner:
df.select($"investments.funding_round.raised_amount")
这篇关于具有复杂输入参数的 Spark SQL UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!