如何在 Spark SQL 中为自定义类型定义架构? [英] How to define schema for custom type in Spark SQL?
问题描述
以下示例代码尝试将一些案例对象放入数据帧中.代码包括使用此特征的案例对象层次结构和案例类的定义:
import org.apache.spark.{SparkContext, SparkConf}导入 org.apache.spark.sql.SQLContext密封性状 一些case 对象 AType 扩展了一些case 对象 BType 扩展了一些案例类数据(名称:字符串,t:一些)对象示例{def main(args: Array[String]) : Unit = {val conf = new SparkConf().setAppName("示例").setMaster("本地[*]")val sc = 新的 SparkContext(conf)val sqlContext = 新的 SQLContext(sc)导入 sqlContext.implicits._val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()df.show()}}
在执行代码时,不幸遇到如下异常:
java.lang.UnsupportedOperationException:不支持类型 Some 的架构
问题
- 是否可以为某些类型添加或定义模式(此处键入
Some
)? - 是否存在另一种方法来表示这种枚举?
- 我尝试直接使用
Enumeration
,但也没有成功.(见下文)
- 我尝试直接使用
枚举
的代码:
object Some extends Enumeration {输入一些 = 值val AType, BType = 值}
提前致谢.我希望最好的方法不是使用字符串.
Spark 2.0.0+:
UserDefinedType
已在 Spark 2.0.0 中私有化,至于现在它没有 Dataset
友好替代.
请参阅:SPARK-14155(在 Spark 2.0 中隐藏 UserDefinedType)>
大多数时候静态类型的 Dataset
可以作为替代有一个待定的 Jira SPARK-7768 将 UDT API 与目标版本再次公开2.4.
火花<2.0.0
<块引用>是否有可能为某些类型添加或定义模式(此处键入 Some)?
我想答案取决于您有多需要它.看起来可以创建一个 UserDefinedType
,但它需要访问 DeveloperApi
并且并不完全直接或有据可查.
import org.apache.spark.sql.types._@SQLUserDefinedType(udt = classOf[SomeUDT])密封性状 一些case 对象 AType 扩展了 Somecase 对象 BType 扩展了一些class SomeUDT 扩展 UserDefinedType[Some] {覆盖 def sqlType: DataType = IntegerType覆盖 def serialize(obj: Any) = {对象匹配{案例 AType =>0情况 BType =>1}}覆盖 def deserialize(datum: Any): Some = {数据匹配{情况 0 =>一种情况 1 =>B型}}覆盖 def userClass: Class[Some] = classOf[Some]}
您可能还应该覆盖 hashCode
和 equals
.
它的 PySpark 对应看起来像这样:
from enum import Enum, unique从 pyspark.sql.types 导入 UserDefinedType, IntegerType类 SomeUDT(UserDefinedType):@类方法def sqlType(self):返回整数类型()@类方法定义模块(cls):返回 cls.__module__@类方法def scalaUDT(cls): # Spark 中需要 <1.5返回'net.zero323.enum.SomeUDT'定义序列化(自我,对象):返回 obj.valuedef反序列化(自我,数据):return {x.value: x for x in Some}[datum]@独特的类一些(枚举):__UDT__ = SomeUDT()类型 = 0类型 = 1
在 Spark
1.5 Python UDT 需要成对的 Scala UDT,但在 1.5 中似乎不再如此.
对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType
而不是整个 Struct
).
The following example code tries to put some case objects into a dataframe. The code includes the definition of a case object hierarchy and a case class using this trait:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
When executing the code, I unfortunately encounter the following exception:
java.lang.UnsupportedOperationException: Schema for type Some is not supported
Questions
- Is there a possibility to add or define a schema for certain types (here type
Some
)? - Does another approach exist to represent this kind of enumerations?
- I tried to use
Enumeration
directly, but also without success. (see below)
- I tried to use
Code for Enumeration
:
object Some extends Enumeration {
type Some = Value
val AType, BType = Value
}
Thanks in advance. I hope, that the best approach is not to use strings instead.
Spark 2.0.0+:
UserDefinedType
has been made private in Spark 2.0.0 and as for now it has no Dataset
friendly replacement.
See: SPARK-14155 (Hide UserDefinedType in Spark 2.0)
Most of the time statically typed Dataset
can serve as replacement
There is a pending Jira SPARK-7768 to make UDT API public again with target version 2.4.
See also How to store custom objects in Dataset?
Spark < 2.0.0
Is there a possibility to add or define a schema for certain types (here type Some)?
I guess the answer depends on how badly you need this. It looks like it is possible to create an UserDefinedType
but it requires access to DeveloperApi
and is not exactly straightforward or well documented.
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some
class SomeUDT extends UserDefinedType[Some] {
override def sqlType: DataType = IntegerType
override def serialize(obj: Any) = {
obj match {
case AType => 0
case BType => 1
}
}
override def deserialize(datum: Any): Some = {
datum match {
case 0 => AType
case 1 => BType
}
}
override def userClass: Class[Some] = classOf[Some]
}
You should probably override hashCode
and equals
as well.
Its PySpark counterpart can look like this:
from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType
class SomeUDT(UserDefinedType):
@classmethod
def sqlType(self):
return IntegerType()
@classmethod
def module(cls):
return cls.__module__
@classmethod
def scalaUDT(cls): # Required in Spark < 1.5
return 'net.zero323.enum.SomeUDT'
def serialize(self, obj):
return obj.value
def deserialize(self, datum):
return {x.value: x for x in Some}[datum]
@unique
class Some(Enum):
__UDT__ = SomeUDT()
AType = 0
BType = 1
In Spark < 1.5 Python UDT requires a paired Scala UDT, but it look like it is no longer the case in 1.5.
For a simple UDT like you can use simple types (for example IntegerType
instead of whole Struct
).
这篇关于如何在 Spark SQL 中为自定义类型定义架构?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!