在Spark Scala中使用struct创建架构 [英] create schema using struct in spark scala
问题描述
我是scala的新手,它试图从元素数组中创建自定义架构,以基于新的自定义架构读取文件.
I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.
我正在从json文件读取数组,并使用了explode方法,并为列数组中的每个元素创建了一个数据框.
I am reading the arrays from json file and used explode method and created a dataframe for each element in column array.
val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()
获得的输出是:
column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
|-- column_id: string (nullable = true)
|-- data_sensitivty: string (nullable = true)
|-- datatype: string (nullable = true)
|-- length: string (nullable = true)
|-- name: string (nullable = true)
val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name = column_values.select("name", "datatype", "length")
column_name.show(4)
+------------------+--------+------+
| name|datatype|length|
+------------------+--------+------+
| object_number| varchar| 100|
| function_type| varchar| 100|
| hof_1| decimal| 17,3|
| hof_2| decimal| 17,2|
| region| varchar| 100|
| country| varchar| null|
+------------------+--------+------+
现在对于上面列出的所有值,我正在尝试使用下面的方法动态创建val模式代码
Now for all the values listed above i am trying to creating val schema dynamically using below code
val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
)
def getFieldType(typeName: String): DataType = typeName match {
case "varchar" => StringType
// TODO include other types here
case _ => StringType
}
上面的
问题是我能够在struct中获取数据类型,,但我也想仅针对十进制数据类型(比例条件为max,条件为长度允许)获取(小数位数和精度)对于小数,如果为null或不存在,则需要将默认值设为(10,0),如果存在的值大于38,则需要将默认值设为(38,0)
推荐答案
这种方法效果很好.
我向您展示了一个完整的示例,该示例可完善您的代码和预期结果.
I show you a full example that completes your code and the expected result.
您可以在 val数据
中引入更多变体.
You could introduce more variants into val data
.
/**
* to obtain a tuple with precision and scale
* @param precision Option[String]
* @return (Int, Int)
*/
def getDecimalScale(precision: Option[String]): (Int, Int) = {
precision match {
case Some(pr) => {
pr.split(",").toList match {
case List(h, _) if h.toInt >= 38 => (38,0)
case List(h, t) => (h.toInt,t.head.toString.toInt)
case _ => (10, 0)
}
}
case None => (10, 0)
}
}
val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
("region", "varchar", "100"), ("country", "varchar", null))
import spark.implicits._
val column_name = sc.parallelize(data).toDF("name","datatype","length")
column_name.show()
/*
+-------------+--------+------+
| name|datatype|length|
+-------------+--------+------+
|object_number| varchar| 100|
|function_type| varchar| 100|
| hof_1| decimal| 17,3|
| hof_2| decimal| 17,2|
| hof_3| decimal| null|
| hof_4| decimal| 39,2|
| region| varchar| 100|
| country| varchar| null|
+-------------+--------+------+
*/
val schemaColumns = column_name.collect()
schemaColumns.foreach(println)
/*
[object_number,varchar,100]
[function_type,varchar,100]
[hof_1,decimal,17,3]
[hof_2,decimal,17,2]
[hof_3,decimal,null]
[hof_4,decimal,39,2]
[region,varchar,100]
[country,varchar,null]
*/
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => {
columnRow.getAs[String]("datatype") match {
case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
case "decimal" => {
val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
}
case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
}
}
)
schema.printTreeString()
/*
root
|-- object_number: string (nullable = true)
|-- function_type: string (nullable = true)
|-- hof_1: decimal(17,3) (nullable = true)
|-- hof_2: decimal(17,2) (nullable = true)
|-- hof_3: decimal(10,0) (nullable = true)
|-- hof_4: decimal(38,0) (nullable = true)
|-- region: string (nullable = true)
|-- country: string (nullable = true)
*/
这篇关于在Spark Scala中使用struct创建架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!