如何在Scala中比较两个structypes,并在Scala中更改列的数据类型? [英] How to compare two structypes in Scala and change datatype of columns in Scala?
问题描述
我正在尝试使用Scala&将数据从GP移至HDFS.火花.
I am trying to move data from GP to HDFS using Scala & Spark.
val execQuery = "select * from schema.tablename"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema
yearDF的模式为:
The schema for yearDF is:
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)
我们的项目给出的关于蜂巢的同一表的模式:
The schema of same table on hive which is given by our project:
val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String
因此,我使用hiveColumns并创建了一个新的StructType,如下所示:
So I took hiveColumns and created a new StructType as given below:
def convertDatatype(datatype: String): DataType = {
val convert = datatype match {
case "string" => StringType
case "bigint" => LongType
case "int" => IntegerType
case "double" => DoubleType
case "date" => TimestampType
case "boolean" => BooleanType
case "timestamp" => TimestampType
}
convert
}
val schemaList = hiveColumns.split("\\|")
val newSchema = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)
当我尝试在yearDF上应用新的架构:schemaStructType时,出现以下异常:
When I try to apply my new schema: schemaStructType on yearDF as below, I get the exception:
Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
由于将十进制转换为双精度而发生异常.
我不明白的是如何将StructType:newSchema
中的列table_refresh_delay_min, release_number, change_number, fdm_application_id
的数据类型从DoubleType
转换为yearDF的Schema中存在的相应数据类型.即
The exception occurs due to conversion of decimal to double.
What I don't understand is how can I convert the datatype of columns: table_refresh_delay_min, release_number, change_number, fdm_application_id
in the StructType: newSchema
from DoubleType
to their corresponding datatypes present in yearDF's Schema. i.e.
如果yearDFSchema
中的列具有精度大于零的十进制数据类型,在这种情况下为十进制(38,30),我需要将newSchema
中的同一列的数据类型转换为DecimalType(38,30)
If the column in yearDFSchema
has a decimal datatype with precision more than zero, in this case decimal(38,30), I need to convert the same column's datatype in newSchema
to DecimalType(38,30)
任何人都可以让我知道如何实现吗?
Could anyone let me know how can I achieve it ?
推荐答案
当您尝试使用 Developer's API 函数在RDD[Row]
上应用架构时,会发生类似的错误:
Errors like this occur when you try to apply schema on RDD[Row]
, using Developer's API functions:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
在这种情况下,存储的数据类型必须匹配外部(即Scala中的值类型)数据类型
In such cases stored data types have to match external (i.e. Value type in Scala) data types as listed in the official SQL and no type casting or coercion is applied.
因此,作为用户,您有责任确保日期和架构兼容.
Therefore it is your responsibility as an user to ensure that the date and schema are compatible.
对您提供的问题的说明指出了完全不同的情况,要求提供CAST
.让我们使用与示例完全相同的模式创建数据集:
The description of the problem you've provided indicates rather different scenario, which asks for CAST
. Let's create dataset with exact the same schema as in your example:
val yearDF = spark.createDataFrame(
sc.parallelize(Seq[Row]()),
StructType(Seq(
StructField("source_system_name", StringType),
StructField("table_refresh_delay_min", DecimalType(38, 30)),
StructField("release_number", DecimalType(38, 30)),
StructField("change_number", DecimalType(38, 30)),
StructField("interface_queue_enabled_flag", StringType),
StructField("rework_enabled_flag", StringType),
StructField("fdm_application_id", DecimalType(15, 0)),
StructField("history_enabled_flag", StringType)
)))
yearDF.printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: decimal(38,30) (nullable = true)
|-- release_number: decimal(38,30) (nullable = true)
|-- change_number: decimal(38,30) (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: decimal(15,0) (nullable = true)
|-- history_enabled_flag: string (nullable = true)
和所需的类型,例如
val dtypes = Seq(
"source_system_name" -> "string",
"table_refresh_delay_min" -> "double",
"release_number" -> "double",
"change_number" -> "double",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "long",
"history_enabled_flag" -> "string"
)
然后您就可以映射:
val mapping = dtypes.toMap
yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
root
|-- source_system_name: string (nullable = true)
|-- table_refresh_delay_min: double (nullable = true)
|-- release_number: double (nullable = true)
|-- change_number: double (nullable = true)
|-- interface_queue_enabled_flag: string (nullable = true)
|-- rework_enabled_flag: string (nullable = true)
|-- fdm_application_id: long (nullable = true)
|-- history_enabled_flag: string (nullable = true)
This of course assumes that actual and desired types are compatible, and CAST
is allowed.
如果由于特定JDBC驱动程序的特殊性而仍然遇到问题,则应考虑手动将强制类型转换直接放置在查询中(在Apache Spark 2.0.0中,是否可以从外部数据库中获取查询(而不是获取整个表)?)
If you still experience problems due you to peculiarities of specific JDBC driver, you should consider placing cast directly in the query, either manually (In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?)
val externalDtypes = Seq(
"source_system_name" -> "text",
"table_refresh_delay_min" -> "double precision",
"release_number" -> "float8",
"change_number" -> "float8",
"interface_queue_enabled_flag" -> "string",
"rework_enabled_flag" -> "string",
"fdm_application_id" -> "bigint",
"history_enabled_flag" -> "string"
)
val externalDtypes = dtypes.map {
case (c, t) => s"CAST(`$c` AS $t)"
} .mkString(", ")
val dbTable = s"""(select $fields from schema.tablename) as tmp"""
或通过自定义模式:
spark.read
.format("jdbc")
.option(
"customSchema",
dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
...
.load()
这篇关于如何在Scala中比较两个structypes,并在Scala中更改列的数据类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!