如何在Scala中比较两个structypes,并在Scala中更改列的数据类型? [英] How to compare two structypes in Scala and change datatype of columns in Scala?

查看:63
本文介绍了如何在Scala中比较两个structypes,并在Scala中更改列的数据类型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Scala&将数据从GP移至HDFS.火花.

I am trying to move data from GP to HDFS using Scala & Spark.

val execQuery    = "select * from schema.tablename"
val yearDF       = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema

yearDF的模式为:

The schema for yearDF is:

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

我们的项目给出的关于蜂巢的同一表的模式:

The schema of same table on hive which is given by our project:

val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String

因此,我使用hiveColumns并创建了一个新的StructType,如下所示:

So I took hiveColumns and created a new StructType as given below:

def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}


val schemaList = hiveColumns.split("\\|")
val newSchema  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

当我尝试在yearDF上应用新的架构:schemaStructType时,出现以下异常:

When I try to apply my new schema: schemaStructType on yearDF as below, I get the exception:

 Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double

由于将十进制转换为双精度而发生异常. 我不明白的是如何将StructType:newSchema中的列table_refresh_delay_min, release_number, change_number, fdm_application_id的数据类型从DoubleType转换为yearDF的Schema中存在的相应数据类型.即

The exception occurs due to conversion of decimal to double. What I don't understand is how can I convert the datatype of columns: table_refresh_delay_min, release_number, change_number, fdm_application_id in the StructType: newSchema from DoubleType to their corresponding datatypes present in yearDF's Schema. i.e.

如果yearDFSchema中的列具有精度大于零的十进制数据类型,在这种情况下为十进制(38,30),我需要将newSchema中的同一列的数据类型转换为DecimalType(38,30)

If the column in yearDFSchema has a decimal datatype with precision more than zero, in this case decimal(38,30), I need to convert the same column's datatype in newSchema to DecimalType(38,30)

任何人都可以让我知道如何实现吗?

Could anyone let me know how can I achieve it ?

推荐答案

当您尝试使用 Developer's API 函数在RDD[Row]上应用架构时,会发生类似的错误:

Errors like this occur when you try to apply schema on RDD[Row], using Developer's API functions:

def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

在这种情况下,存储的数据类型必须匹配外部(即Scala中的值类型)数据类型

In such cases stored data types have to match external (i.e. Value type in Scala) data types as listed in the official SQL and no type casting or coercion is applied.

因此,作为用户,您有责任确保日期和架构兼容.

Therefore it is your responsibility as an user to ensure that the date and schema are compatible.

对您提供的问题的说明指出了完全不同的情况,要求提供CAST.让我们使用与示例完全相同的模式创建数据集:

The description of the problem you've provided indicates rather different scenario, which asks for CAST. Let's create dataset with exact the same schema as in your example:

val yearDF = spark.createDataFrame(
  sc.parallelize(Seq[Row]()),
  StructType(Seq(
    StructField("source_system_name", StringType),
    StructField("table_refresh_delay_min", DecimalType(38, 30)),
    StructField("release_number", DecimalType(38, 30)),
    StructField("change_number", DecimalType(38, 30)),
    StructField("interface_queue_enabled_flag", StringType),
    StructField("rework_enabled_flag", StringType),
    StructField("fdm_application_id", DecimalType(15, 0)),
    StructField("history_enabled_flag", StringType)
)))

yearDF.printSchema

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

和所需的类型,例如

val dtypes = Seq(
  "source_system_name" -> "string",
  "table_refresh_delay_min" -> "double",
  "release_number" -> "double",
  "change_number" -> "double",
  "interface_queue_enabled_flag" -> "string",
  "rework_enabled_flag" -> "string",
  "fdm_application_id" -> "long",
  "history_enabled_flag" -> "string"
)

然后您就可以映射:

val mapping = dtypes.toMap

yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

这当然假定实际类型和所需类型兼容,并且

This of course assumes that actual and desired types are compatible, and CAST is allowed.

如果由于特定JDBC驱动程序的特殊性而仍然遇到问题,则应考虑手动将强制类型转换直接放置在查询中(在Apache Spark 2.0.0中,是否可以从外部数据库中获取查询(而不是获取整个表)?)

If you still experience problems due you to peculiarities of specific JDBC driver, you should consider placing cast directly in the query, either manually (In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?)

val externalDtypes = Seq(
  "source_system_name" -> "text",
  "table_refresh_delay_min" -> "double precision",
  "release_number" -> "float8",
  "change_number" -> "float8",
  "interface_queue_enabled_flag" -> "string",
  "rework_enabled_flag" -> "string",
  "fdm_application_id" -> "bigint",
  "history_enabled_flag" -> "string"
)

val externalDtypes = dtypes.map { 
  case (c, t) => s"CAST(`$c` AS $t)" 
} .mkString(", ")

val dbTable = s"""(select $fields from schema.tablename) as tmp"""

或通过自定义模式:

spark.read
  .format("jdbc")
  .option(
    "customSchema",
    dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
  ...
  .load()

这篇关于如何在Scala中比较两个structypes,并在Scala中更改列的数据类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆