具有复杂输入参数的Spark SQL UDF [英] Spark SQL UDF with complex input parameter

查看:161
本文介绍了具有复杂输入参数的Spark SQL UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将UDF与struct的输入类型一起使用. 我有以下数据结构,这只是更大结构的相关部分

I'm trying to use UDF with input type Array of struct. I have the following structure of data this is only relevant part of a bigger structure

|--investments: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- funding_round: struct (nullable = true)
    |    |    |    |-- company: struct (nullable = true)
    |    |    |    |    |-- name: string (nullable = true)
    |    |    |    |    |-- permalink: string (nullable = true)
    |    |    |    |-- funded_day: long (nullable = true)
    |    |    |    |-- funded_month: long (nullable = true)
    |    |    |    |-- funded_year: long (nullable = true)
    |    |    |    |-- raised_amount: long (nullable = true)
    |    |    |    |-- raised_currency_code: string (nullable = true)
    |    |    |    |-- round_code: string (nullable = true)
    |    |    |    |-- source_description: string (nullable = true)
    |    |    |    |-- source_url: string (nullable = true)

我声明了案例类:

case class Company(name: String, permalink: String)
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String)
case class Investments(funding_round: FundingRound)

UDF声明:

sqlContext.udf.register("total_funding", (investments:Seq[Investments])  => {
     val totals = investments.map(r => r.funding_round.raised_amount)
     totals.sum
})

当我执行以下转换时,结果是预期的

When I'm executing the following transformation the result is as expected

scala> sqlContext.sql("""select total_funding(investments) from companies""")
res11: org.apache.spark.sql.DataFrame = [_c0: bigint]

但是当执行诸如collect之类的动作时,我会出错:

But when an action executed like collect I have an error:

Executor: Exception in task 0.0 in stage 4.0 (TID 10)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments

谢谢您的帮助.

推荐答案

您看到的错误应该是不言自明的.在 Spark SQL的相关部分,数据框架和数据集指南.

The error you see should be pretty much self-explanatory. There is a strict mapping between Catalyst / SQL types and Scala types which can be found in the relevant section of the Spark SQL, DataFrames and Datasets Guide.

特别是struct类型将转换为o.a.s.sql.Row(在您的特殊情况下,数据将显示为Seq[Row]).

In particular struct types are converted to o.a.s.sql.Row (in your particular case data will be exposed as Seq[Row]).

可以使用多种方法将数据公开为特定类型:

There are different methods which can be used to expose data as specific types:

  • Defining UDT (user defined type) which has been removed in 2.0.0 and has no replacement for now.
  • Converting DataFrame to Dataset[T] where T is a desired local type.

仅使用前一种方法可能适用于这种特定情况.

with only the former approach could be applicable in this particular scenario.

如果要使用UDF访问investments.funding_round.raised_amount,则将需要以下内容:

If you want to access investments.funding_round.raised_amount using UDF you'll need something like this:

val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
  investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount"))
).toOption)

但简单的select应该更安全,更清洁:

but simple select should be much safer and cleaner:

df.select($"investments.funding_round.raised_amount")

这篇关于具有复杂输入参数的Spark SQL UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆