在Spark Scala中使用struct创建架构 [英] create schema using struct in spark scala

查看:167
本文介绍了在Spark Scala中使用struct创建架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是scala的新手,它试图从元素数组中创建自定义架构,以基于新的自定义架构读取文件.

I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.

我正在从json文件读取数组,并使用了explode方法,并为列数组中的每个元素创建了一个数据框.

I am reading the arrays from json file and used explode method and created a dataframe for each element in column array.

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

获得的输出是:

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
 |-- column_id: string (nullable = true)
 |-- data_sensitivty: string (nullable = true)
 |-- datatype: string (nullable = true)
 |-- length: string (nullable = true)
 |-- name: string (nullable = true)

val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name = column_values.select("name", "datatype", "length")

column_name.show(4)


 +------------------+--------+------+
 |              name|datatype|length|
 +------------------+--------+------+
 |     object_number| varchar|   100|
 |     function_type| varchar|   100|
 |             hof_1| decimal|  17,3|
 |             hof_2| decimal|  17,2|
 |            region| varchar|   100|
 |           country| varchar|  null|
 +------------------+--------+------+

现在对于上面列出的所有值,我正在尝试使用下面的方法动态创建val模式代码

Now for all the values listed above i am trying to creating val schema dynamically using below code

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
  (schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
  )

def getFieldType(typeName: String): DataType = typeName match {
    case "varchar" => StringType
    // TODO include other types here
    case _ => StringType
  }

上面的

问题是我能够在struct中获取数据类型,,但我也想仅针对十进制数据类型(比例条件为max,条件为长度允许)获取(小数位数和精度)对于小数,如果为null或不存在,则需要将默认值设为(10,0),如果存在的值大于38,则需要将默认值设为(38,0)

推荐答案

这种方法效果很好.

我向您展示了一个完整的示例,该示例可完善您的代码和预期结果.

I show you a full example that completes your code and the expected result.

您可以在 val数据中引入更多变体.

You could introduce more variants into val data.

  /**
    * to obtain a tuple with precision and scale
    * @param precision Option[String]
    * @return (Int, Int)
    */
  def getDecimalScale(precision: Option[String]): (Int, Int) = {
    precision match {
      case Some(pr) => {
        pr.split(",").toList match {
          case List(h, _) if h.toInt >= 38 => (38,0)
          case List(h, t) => (h.toInt,t.head.toString.toInt)
          case _ => (10, 0)
        }
      }
      case None => (10, 0)
    }
  }

    val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
      ("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
      ("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
      ("region", "varchar", "100"), ("country", "varchar", null))

    import spark.implicits._

    val column_name = sc.parallelize(data).toDF("name","datatype","length")

    column_name.show()
/*
+-------------+--------+------+
|         name|datatype|length|
+-------------+--------+------+
|object_number| varchar|   100|
|function_type| varchar|   100|
|        hof_1| decimal|  17,3|
|        hof_2| decimal|  17,2|
|        hof_3| decimal|  null|
|        hof_4| decimal|  39,2|
|       region| varchar|   100|
|      country| varchar|  null|
+-------------+--------+------+
*/

    val schemaColumns = column_name.collect()
    schemaColumns.foreach(println)
/*
[object_number,varchar,100]
[function_type,varchar,100]
[hof_1,decimal,17,3]
[hof_2,decimal,17,2]
[hof_3,decimal,null]
[hof_4,decimal,39,2]
[region,varchar,100]
[country,varchar,null]
*/

    val schema = schemaColumns.foldLeft(new StructType())(
      (schema, columnRow) => {
        columnRow.getAs[String]("datatype") match {
          case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
          case "decimal" => {
            val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
            schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
          }
          case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
        }
      }
    )

    schema.printTreeString()
/*
root
 |-- object_number: string (nullable = true)
 |-- function_type: string (nullable = true)
 |-- hof_1: decimal(17,3) (nullable = true)
 |-- hof_2: decimal(17,2) (nullable = true)
 |-- hof_3: decimal(10,0) (nullable = true)
 |-- hof_4: decimal(38,0) (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
*/

这篇关于在Spark Scala中使用struct创建架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆