UDT的Spark SQL引用属性 [英] Spark SQL referencing attributes of UDT

查看:92
本文介绍了UDT的Spark SQL引用属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现自定义UDT并能够从Spark SQL引用它(如Spark SQL白皮书的第4.4.2节中所述).

I am trying to implement a custom UDT and be able to reference it from Spark SQL (as explained in the Spark SQL whitepaper, section 4.4.2).

真正的例子是使用Cap'n Proto或类似的方法,使自定义UDT由堆外数据结构提供支持.

The real example is to have a custom UDT backed by an off-heap data structure using Cap'n Proto, or similar.

对于这个帖子,我做了一个人为的例子.我知道我可以只使用Scala案例类,而不必做任何工作,但这不是我的目标.

For this posting, I have made up a contrived example. I know that I could just use Scala case classes and not have to do any work at all, but that isn't my goal.

例如,我有一个包含多个属性的Person,而我希望能够SELECT person.first_name FROM person.我遇到错误Can't extract value from person#1,我不确定为什么.

For example, I have a Person containing several attributes and I want to be able to SELECT person.first_name FROM person. I'm running into the error Can't extract value from person#1 and I'm not sure why.

这里是完整的源代码(也可以从 https://github.com/andygrove/spark获得-sql-udt )

Here is the full source (also available at https://github.com/andygrove/spark-sql-udt)

package com.theotherandygrove

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Example {

  def main(arg: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("Example")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val schema = StructType(List(
      StructField("person_id", DataTypes.IntegerType, true),
      StructField("person", new MockPersonUDT, true)))

    // load initial RDD
    val rdd = sc.parallelize(List(
      MockPersonImpl(1),
      MockPersonImpl(2)
    ))

    // convert to RDD[Row]
    val rowRdd = rdd.map(person => Row(person.getAge, person))

    // convert to DataFrame (RDD + Schema)
    val dataFrame = sqlContext.createDataFrame(rowRdd, schema)

    // register as a table
    dataFrame.registerTempTable("person")

    // selecting the whole object works fine
    val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")

    val people = results.collect

    people.map(row => {
      println(row)
    })

  }

}

trait MockPerson {
  def getFirstName: String
  def getLastName: String
  def getAge: Integer
  def getState: String
}

class MockPersonUDT extends UserDefinedType[MockPerson] {

  override def sqlType: DataType = StructType(List(
    StructField("firstName", StringType, nullable=false),
    StructField("lastName", StringType, nullable=false),
    StructField("age", IntegerType, nullable=false),
    StructField("state", StringType, nullable=false)
  ))

  override def userClass: Class[MockPerson] = classOf[MockPerson]

  override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge

  override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}

@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
  def getFirstName = "First" + n
  def getLastName = "Last" + n
  def getAge = n
  def getState = "AK"
}

如果我只是SELECT person FROM person,则查询有效.即使在架构中定义了属性,我也无法引用SQL中的属性.

If I simply SELECT person FROM person then the query works. I just can't reference the attributes in SQL, even though they are defined in the schema.

推荐答案

您会收到此错误,因为sqlType定义的架构从不公开,也不能直接访问.它只是提供了一种使用本机Spark SQL类型表示复杂数据类型的方法.

You get this errors because schema defined by sqlType is never exposed and is not intended to be accessed directly. It simply provides a way to express a complex data types using native Spark SQL types.

您可以使用UDF访问单个属性,但首先让我们证明内部结构确实没有公开:

You can access individual attributes using UDFs but first lets show that the internal structure is indeed not exposed:

dataFrame.printSchema
// root
//  |-- person_id: integer (nullable = true)
//  |-- person: mockperso (nullable = true)

要创建UDF,我们需要将以给定UDT表示的类型的对象作为参数的函数:

To create UDF we need functions which take as an argument an object of a type represented by a given UDT:

import org.apache.spark.sql.functions.udf

val getFirstName = (person: MockPerson) => person.getFirstName
val getLastName = (person: MockPerson) => person.getLastName
val getAge = (person: MockPerson) => person.getAge

可以使用udf函数包装的

:

which can be wrapped using udf function:

val getFirstNameUDF = udf(getFirstName)
val getLastNameUDF = udf(getLastName)
val getAgeUDF = udf(getAge)

dataFrame.select(
  getFirstNameUDF($"person").alias("first_name"),
  getLastNameUDF($"person").alias("last_name"),
  getAgeUDF($"person").alias("age")
).show()

// +----------+---------+---+
// |first_name|last_name|age|
// +----------+---------+---+
// |    First1|    Last1|  1|
// |    First2|    Last2|  2|
// +----------+---------+---+

要将它们与原始SQL一起使用,您可以通过SQLContext进行注册:

To use these with raw SQL you have register functions through SQLContext:

sqlContext.udf.register("first_name", getFirstName)
sqlContext.udf.register("last_name", getLastName)
sqlContext.udf.register("age", getAge)

sqlContext.sql("""
  SELECT first_name(person) AS first_name, last_name(person) AS last_name
  FROM person
  WHERE age(person) < 100""").show

// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// |    First1|    Last1|
// |    First2|    Last2|
// +----------+---------+

不幸的是,它带有价格标签.首先,每个操作都需要反序列化.它还极大地限制了查询优化的方式.特别是,对这些字段之一进行的任何join运算都需要笛卡尔积.

Unfortunately it comes with a price tag attached. First of all every operation requires deserialization. It also substantially limits the ways in which query can be optimized. In particular any join operation on one of these fields requires a Cartesian product.

在实践中,如果要编码复杂的结构,其中包含可以使用内置类型表示的属性,则最好使用StructType:

In practice if you want to encode a complex structure, which contains attributes that can be expressed using built-in types, it is better to use StructType:

case class Person(first_name: String, last_name: String, age: Int)

val df = sc.parallelize(
  (1 to 2).map(i => (i, Person(s"First$i", s"Last$i", i)))).toDF("id", "person")

df.printSchema

// root
//  |-- id: integer (nullable = false)
//  |-- person: struct (nullable = true)
//  |    |-- first_name: string (nullable = true)
//  |    |-- last_name: string (nullable = true)
//  |    |-- age: integer (nullable = false)

df
  .where($"person.age" < 100)
  .select($"person.first_name", $"person.last_name")
  .show

// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// |    First1|    Last1|
// |    First2|    Last2|
// +----------+---------+

并为实际类型扩展保留UDT,例如内置VectorUDT或可以受益于特定表示形式的东西枚举.

and reserve UDTs for actual types extensions like built-in VectorUDT or things that can benefit from a specific representation like enumerations.

这篇关于UDT的Spark SQL引用属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆