Spark SQL 引用 UDT 的属性 [英] Spark SQL referencing attributes of UDT

查看:24
本文介绍了Spark SQL 引用 UDT 的属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现自定义 UDT 并能够从 Spark SQL 中引用它(如 Spark SQL 白皮书的第 4.4.2 节中所述).

I am trying to implement a custom UDT and be able to reference it from Spark SQL (as explained in the Spark SQL whitepaper, section 4.4.2).

真正的例子是使用 Cap'n Proto 或类似的方法拥有一个由堆外数据结构支持的自定义 UDT.

The real example is to have a custom UDT backed by an off-heap data structure using Cap'n Proto, or similar.

对于这篇文章,我编了一个人为的例子.我知道我可以只使用 Scala 案例类而根本不必做任何工作,但这不是我的目标.

For this posting, I have made up a contrived example. I know that I could just use Scala case classes and not have to do any work at all, but that isn't my goal.

例如,我有一个包含多个属性的 Person,我希望能够SELECT person.first_name FROM person.我遇到了错误Can't extract value from person#1,我不知道为什么.

For example, I have a Person containing several attributes and I want to be able to SELECT person.first_name FROM person. I'm running into the error Can't extract value from person#1 and I'm not sure why.

这里是完整的源代码(也可以在 https://github.com/andygrove/spark-sql-udt)

Here is the full source (also available at https://github.com/andygrove/spark-sql-udt)

package com.theotherandygrove

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Example {

  def main(arg: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("Example")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val schema = StructType(List(
      StructField("person_id", DataTypes.IntegerType, true),
      StructField("person", new MockPersonUDT, true)))

    // load initial RDD
    val rdd = sc.parallelize(List(
      MockPersonImpl(1),
      MockPersonImpl(2)
    ))

    // convert to RDD[Row]
    val rowRdd = rdd.map(person => Row(person.getAge, person))

    // convert to DataFrame (RDD + Schema)
    val dataFrame = sqlContext.createDataFrame(rowRdd, schema)

    // register as a table
    dataFrame.registerTempTable("person")

    // selecting the whole object works fine
    val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")

    val people = results.collect

    people.map(row => {
      println(row)
    })

  }

}

trait MockPerson {
  def getFirstName: String
  def getLastName: String
  def getAge: Integer
  def getState: String
}

class MockPersonUDT extends UserDefinedType[MockPerson] {

  override def sqlType: DataType = StructType(List(
    StructField("firstName", StringType, nullable=false),
    StructField("lastName", StringType, nullable=false),
    StructField("age", IntegerType, nullable=false),
    StructField("state", StringType, nullable=false)
  ))

  override def userClass: Class[MockPerson] = classOf[MockPerson]

  override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge

  override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}

@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
  def getFirstName = "First" + n
  def getLastName = "Last" + n
  def getAge = n
  def getState = "AK"
}

如果我只是SELECT person FROM person,那么查询有效.我只是无法在 SQL 中引用这些属性,即使它们是在架构中定义的.

If I simply SELECT person FROM person then the query works. I just can't reference the attributes in SQL, even though they are defined in the schema.

推荐答案

您收到此错误是因为 sqlType 定义的架构从未公开且不打算直接访问.它只是提供了一种使用原生 Spark SQL 类型来表达复杂数据类型的方法.

You get this errors because schema defined by sqlType is never exposed and is not intended to be accessed directly. It simply provides a way to express a complex data types using native Spark SQL types.

您可以使用 UDF 访问单个属性,但首先让我们证明内部结构确实没有公开:

You can access individual attributes using UDFs but first lets show that the internal structure is indeed not exposed:

dataFrame.printSchema
// root
//  |-- person_id: integer (nullable = true)
//  |-- person: mockperso (nullable = true)

要创建 UDF,我们需要将给定 UDT 表示的类型的对象作为参数的函数:

To create UDF we need functions which take as an argument an object of a type represented by a given UDT:

import org.apache.spark.sql.functions.udf

val getFirstName = (person: MockPerson) => person.getFirstName
val getLastName = (person: MockPerson) => person.getLastName
val getAge = (person: MockPerson) => person.getAge

可以使用 udf 函数包装:

which can be wrapped using udf function:

val getFirstNameUDF = udf(getFirstName)
val getLastNameUDF = udf(getLastName)
val getAgeUDF = udf(getAge)

dataFrame.select(
  getFirstNameUDF($"person").alias("first_name"),
  getLastNameUDF($"person").alias("last_name"),
  getAgeUDF($"person").alias("age")
).show()

// +----------+---------+---+
// |first_name|last_name|age|
// +----------+---------+---+
// |    First1|    Last1|  1|
// |    First2|    Last2|  2|
// +----------+---------+---+

要将这些与原始 SQL 一起使用,您需要通过 SQLContext 注册函数:

To use these with raw SQL you have register functions through SQLContext:

sqlContext.udf.register("first_name", getFirstName)
sqlContext.udf.register("last_name", getLastName)
sqlContext.udf.register("age", getAge)

sqlContext.sql("""
  SELECT first_name(person) AS first_name, last_name(person) AS last_name
  FROM person
  WHERE age(person) < 100""").show

// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// |    First1|    Last1|
// |    First2|    Last2|
// +----------+---------+

不幸的是,它附有价格标签.首先,每个操作都需要反序列化.它还极大地限制了可以优化查询的方式.特别是对这些字段之一的任何 join 操作都需要笛卡尔积.

Unfortunately it comes with a price tag attached. First of all every operation requires deserialization. It also substantially limits the ways in which query can be optimized. In particular any join operation on one of these fields requires a Cartesian product.

在实践中,如果你想编码一个复杂的结构,其中包含可以使用内置类型表达的属性,最好使用StructType:

In practice if you want to encode a complex structure, which contains attributes that can be expressed using built-in types, it is better to use StructType:

case class Person(first_name: String, last_name: String, age: Int)

val df = sc.parallelize(
  (1 to 2).map(i => (i, Person(s"First$i", s"Last$i", i)))).toDF("id", "person")

df.printSchema

// root
//  |-- id: integer (nullable = false)
//  |-- person: struct (nullable = true)
//  |    |-- first_name: string (nullable = true)
//  |    |-- last_name: string (nullable = true)
//  |    |-- age: integer (nullable = false)

df
  .where($"person.age" < 100)
  .select($"person.first_name", $"person.last_name")
  .show

// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// |    First1|    Last1|
// |    First2|    Last2|
// +----------+---------+

并为实际类型扩展保留 UDT,例如内置 VectorUDT 或可以从特定表示中受益的事物 类似枚举.

and reserve UDTs for actual types extensions like built-in VectorUDT or things that can benefit from a specific representation like enumerations.

这篇关于Spark SQL 引用 UDT 的属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆