如何在 Spark SQL 中为自定义类型定义架构? [英] How to define schema for custom type in Spark SQL?

查看:32
本文介绍了如何在 Spark SQL 中为自定义类型定义架构?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下示例代码尝试将一些案例对象放入数据帧中.代码包括使用此特征的案例对象层次结构和案例类的定义:

import org.apache.spark.{SparkContext, SparkConf}导入 org.apache.spark.sql.SQLContext密封性状 一些case 对象 AType 扩展了一些case 对象 BType 扩展了一些案例类数据(名称:字符串,t:一些)对象示例{def main(args: Array[String]) : Unit = {val conf = new SparkConf().setAppName("示例").setMaster("本地[*]")val sc = 新的 SparkContext(conf)val sqlContext = 新的 SQLContext(sc)导入 sqlContext.implicits._val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()df.show()}}

在执行代码时,不幸遇到如下异常:

java.lang.UnsupportedOperationException:不支持类型 Some 的架构

问题

  • 是否可以为某些类型添加或定义模式(此处键入 Some)?
  • 是否存在另一种方法来表示这种枚举?
    • 我尝试直接使用Enumeration,但也没有成功.(见下文)

枚举的代码:

object Some extends Enumeration {输入一些 = 值val AType, BType = 值}

提前致谢.我希望最好的方法不是使用字符串.

解决方案

Spark 2.0.0+:

UserDefinedType 已在 Spark 2.0.0 中私有化,至于现在它没有 Dataset 友好替代.

请参阅:SPARK-14155(在 Spark 2.0 中隐藏 UserDefinedType)>

大多数时候静态类型的 Dataset 可以作为替代有一个待定的 Jira SPARK-7768 将 UDT API 与目标版本再次公开2.4.

另见如何在数据集中存储自定义对象?

火花<2.0.0

<块引用>

是否有可能为某些类型添加或定义模式(此处键入 Some)?

我想答案取决于您有多需要它.看起来可以创建一个 UserDefinedType,但它需要访问 DeveloperApi 并且并不完全直接或有据可查.

import org.apache.spark.sql.types._@SQLUserDefinedType(udt = classOf[SomeUDT])密封性状 一些case 对象 AType 扩展了 Somecase 对象 BType 扩展了一些class SomeUDT 扩展 UserDefinedType[Some] {覆盖 def sqlType: DataType = IntegerType覆盖 def serialize(obj: Any) = {对象匹配{案例 AType =>0情况 BType =>1}}覆盖 def deserialize(datum: Any): Some = {数据匹配{情况 0 =>一种情况 1 =>B型}}覆盖 def userClass: Class[Some] = classOf[Some]}

您可能还应该覆盖 hashCodeequals.

它的 PySpark 对应看起来像这样:

from enum import Enum, unique从 pyspark.sql.types 导入 UserDefinedType, IntegerType类 SomeUDT(UserDefinedType):@类方法def sqlType(self):返回整数类型()@类方法定义模块(cls):返回 cls.__module__@类方法def scalaUDT(cls): # Spark 中需要 <1.5返回'net.zero323.enum.SomeUDT'定义序列化(自我,对象):返回 obj.valuedef反序列化(自我,数据):return {x.value: x for x in Some}[datum]@独特的类一些(枚举):__UDT__ = SomeUDT()类型 = 0类型 = 1

在 Spark

1.5 Python UDT 需要成对的 Scala UDT,但在 1.5 中似乎不再如此.

对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType 而不是整个 Struct).

The following example code tries to put some case objects into a dataframe. The code includes the definition of a case object hierarchy and a case class using this trait:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

When executing the code, I unfortunately encounter the following exception:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

Questions

  • Is there a possibility to add or define a schema for certain types (here type Some)?
  • Does another approach exist to represent this kind of enumerations?
    • I tried to use Enumeration directly, but also without success. (see below)

Code for Enumeration:

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

Thanks in advance. I hope, that the best approach is not to use strings instead.

解决方案

Spark 2.0.0+:

UserDefinedType has been made private in Spark 2.0.0 and as for now it has no Dataset friendly replacement.

See: SPARK-14155 (Hide UserDefinedType in Spark 2.0)

Most of the time statically typed Dataset can serve as replacement There is a pending Jira SPARK-7768 to make UDT API public again with target version 2.4.

See also How to store custom objects in Dataset?

Spark < 2.0.0

Is there a possibility to add or define a schema for certain types (here type Some)?

I guess the answer depends on how badly you need this. It looks like it is possible to create an UserDefinedType but it requires access to DeveloperApi and is not exactly straightforward or well documented.

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

You should probably override hashCode and equals as well.

Its PySpark counterpart can look like this:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

In Spark < 1.5 Python UDT requires a paired Scala UDT, but it look like it is no longer the case in 1.5.

For a simple UDT like you can use simple types (for example IntegerType instead of whole Struct).

这篇关于如何在 Spark SQL 中为自定义类型定义架构?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆