SparkSQL引用UDT的属性 [英] SparkSQL referencing attributes of UDT

查看:1243
本文介绍了SparkSQL引用UDT的属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想实现一个自定义UDT并能够从星火SQL引用它(如星火SQL白皮书解释说,第4.4.2节)。

I am trying to implement a custom UDT and be able to reference it from Spark SQL (as explained in the Spark SQL whitepaper, section 4.4.2).

真正的例子是必须使用原头儿,或类似的场外堆数据结构支持自定义UDT。

The real example is to have a custom UDT backed by an off-heap data structure using Cap'n Proto, or similar.

有关这个帖子,我做了一个人为的例子。我知道我可以只使用Scala的case类,并没有做任何工作可言,但是这不是我的目标。

For this posting, I have made up a contrived example. I know that I could just use Scala case classes and not have to do any work at all, but that isn't my goal.

例如,我有一个包含一些属性,我希望能够 SELECT person.first_name从一个人。我跑入误差无法从人#1 中获取价值,我不知道为什么。

For example, I have a Person containing several attributes and I want to be able to SELECT person.first_name FROM person. I'm running into the error Can't extract value from person#1 and I'm not sure why.

下面是完整的源代码(也可在 https://github.com/andygrove/spark -SQL-UDT

Here is the full source (also available at https://github.com/andygrove/spark-sql-udt)

package com.theotherandygrove

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Example {

  def main(arg: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("Example")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val schema = StructType(List(
      StructField("person_id", DataTypes.IntegerType, true),
      StructField("person", new MockPersonUDT, true)))

    // load initial RDD
    val rdd = sc.parallelize(List(
      MockPersonImpl(1),
      MockPersonImpl(2)
    ))

    // convert to RDD[Row]
    val rowRdd = rdd.map(person => Row(person.getAge, person))

    // convert to DataFrame (RDD + Schema)
    val dataFrame = sqlContext.createDataFrame(rowRdd, schema)

    // register as a table
    dataFrame.registerTempTable("person")

    // selecting the whole object works fine
    val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")

    val people = results.collect

    people.map(row => {
      println(row)
    })

  }

}

trait MockPerson {
  def getFirstName: String
  def getLastName: String
  def getAge: Integer
  def getState: String
}

class MockPersonUDT extends UserDefinedType[MockPerson] {

  override def sqlType: DataType = StructType(List(
    StructField("firstName", StringType, nullable=false),
    StructField("lastName", StringType, nullable=false),
    StructField("age", IntegerType, nullable=false),
    StructField("state", StringType, nullable=false)
  ))

  override def userClass: Class[MockPerson] = classOf[MockPerson]

  override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge

  override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}

@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
  def getFirstName = "First" + n
  def getLastName = "Last" + n
  def getAge = n
  def getState = "AK"
}

如果我只是 SELECT人从一个人然后查询工作。我只是不能引用SQL的属性,尽管它们在架构中定义。

If I simply SELECT person FROM person then the query works. I just can't reference the attributes in SQL, even though they are defined in the schema.

推荐答案

由于架构由则sqlType 定义不会暴露,并且不打算直接访问你得到这样的错误。它只是提供了一种前preSS使用原生火花SQL类型的复杂数据类型。

You get this errors because schema defined by sqlType is never exposed and is not intended to be accessed directly. It simply provides a way to express a complex data types using native Spark SQL types.

您可以使用UDF的访问单个属性,但首先让显示内部结构确实不暴露:

You can access individual attributes using UDFs but first lets show that the internal structure is indeed not exposed:

dataFrame.printSchema
// root
//  |-- person_id: integer (nullable = true)
//  |-- person: mockperso (nullable = true)

要创建UDF,我们需要采取哪些作为一个参数由给定UDT psented一种可再$ P $的目标函数:

To create UDF we need functions which take as an argument an object of a type represented by a given UDT:

import org.apache.spark.sql.functions.udf

val getFirstName = (person: MockPerson) => person.getFirstName
val getLastName = (person: MockPerson) => person.getLastName
val getAge = (person: MockPerson) => person.getAge

这可以使用 UDF 函数来包装:

val getFirstNameUDF = udf(getFirstName)
val getLastNameUDF = udf(getLastName)
val getAgeUDF = udf(getAge)

dataFrame.select(
  getFirstNameUDF($"person").alias("first_name"),
  getLastNameUDF($"person").alias("last_name"),
  getAgeUDF($"person").alias("age")
).show()

// +----------+---------+---+
// |first_name|last_name|age|
// +----------+---------+---+
// |    First1|    Last1|  1|
// |    First2|    Last2|  2|
// +----------+---------+---+

要使用这些与原始SQL您必须通过寄存器的功能 SQLContext

To use these with raw SQL you have register functions through SQLContext:

sqlContext.udf.register("first_name", getFirstName)
sqlContext.udf.register("last_name", getLastName)
sqlContext.udf.register("age", getAge)

sqlContext.sql("""
  SELECT first_name(person) AS first_name, last_name(person) AS last_name
  FROM person
  WHERE age(person) < 100""").show

// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// |    First1|    Last1|
// |    First2|    Last2|
// +----------+---------+

不幸的是它带有一个附加的价格标签。首先,每一个操作都需要反序列化。这也大大限制其查询可以进行优化的方式。特别是对这些领域中的任何一个加入操作需要一个笛卡尔乘积。

Unfortunately it comes with a price tag attached. First of all every operation requires deserialization. It also substantially limits the ways in which query can be optimized. In particular any join operation on one of these fields requires a Cartesian product.

在实践中,如果你想连接codeA复杂的结构,它包含的属性,可以前pressed使用内置的类型,最好是使用 StructType

In practice if you want to encode a complex structure, which contains attributes that can be expressed using built-in types, it is better to use StructType:

case class Person(first_name: String, last_name: String, age: Int)

val df = sc.parallelize(
  (1 to 2).map(i => (i, Person(s"First$i", s"Last$i", i)))).toDF("id", "person")

df.printSchema

// root
//  |-- id: integer (nullable = false)
//  |-- person: struct (nullable = true)
//  |    |-- first_name: string (nullable = true)
//  |    |-- last_name: string (nullable = true)
//  |    |-- age: integer (nullable = false)

df
  .where($"person.age" < 100)
  .select($"person.first_name", $"person.last_name")
  .show

// +----------+---------+
// |first_name|last_name|
// +----------+---------+
// |    First1|    Last1|
// |    First2|    Last2|
// +----------+---------+

和实际类型的扩展名如内置的 VectorUDT 或东西,可以从特定重presentation的like~~V枚举的。

and reserve UDTs for actual types extensions like built-in VectorUDT or things that can benefit from a specific representation like enumerations.

这篇关于SparkSQL引用UDT的属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆