用不同的数据类型在 Scala 中展平数据帧 [英] Flatten a DataFrame in Scala with different DataTypes inside

查看:29
本文介绍了用不同的数据类型在 Scala 中展平数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如您所知,DataFrame 可以包含复杂类型的字段,例如结构 (StructType) 或数组 (ArrayType).在我的情况下,您可能需要将所有 DataFrame 数据映射到一个带有简单类型字段(字符串、整数...)的 Hive 表.我已经在这个问题上挣扎了很长时间,我终于找到了一个我想分享的解决方案.另外,我相信它可以改进,所以请随时回复您自己的建议.

As you may know, a DataFrame can contain fields which are complex types, like structures (StructType) or arrays (ArrayType). You may need, as in my case, to map all the DataFrame data to a Hive table, with simple type fields (String, Integer...). I've been struggling with this issue for a long time, and I've finally found a solution I want to share. Also, I'm sure it could be improved, so feel free to reply with your own suggestions.

它基于 这个线程,但也有效对于 ArrayType 元素,不仅仅是 StructType 元素.它是一个尾递归函数,它接收一个 DataFrame,并将其扁平化返回.

It's based on this thread, but also works for ArrayType elements, not only StructType ones. It is a tail recursive function which receives a DataFrame, and returns it flattened.

def flattenDf(df: DataFrame): DataFrame = {
  var end = false
  var i = 0
  val fields = df.schema.fields
  val fieldNames = fields.map(f => f.name)
  val fieldsNumber = fields.length

  while (!end) {
    val field = fields(i)
    val fieldName = field.name

    field.dataType match {
      case st: StructType =>
        val childFieldNames = st.fieldNames.map(n => fieldName + "." + n)
        val newFieldNames = fieldNames.filter(_ != fieldName) ++ childFieldNames
        val newDf = df.selectExpr(newFieldNames: _*)
        return flattenDf(newDf)
      case at: ArrayType =>
        val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
        val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode($fieldName) as a")
        val fieldNamesToSelect = fieldNamesExcludingArray ++ Array("a.*")
        val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
        val explodedAndSelectedDf = explodedDf.selectExpr(fieldNamesToSelect: _*)
        return flattenDf(explodedAndSelectedDf)
      case _ => Unit
    }

    i += 1
    end = i >= fieldsNumber
  }
  df
}

推荐答案

val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()

val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()

df.printSchema

root
 |-- _1: string (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: struct (nullable = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)
 |-- _3: array (nullable = true)
 |    |-- element: integer (containsNull = false)


def flattenSchema(schema: StructType, fieldName: String = null) : Array[Column] = {
   schema.fields.flatMap(f => {
     val cols = if (fieldName == null) f.name else (fieldName + "." + f.name)
     f.dataType match {
       case structType: StructType => fattenSchema(structType, cols)
       case arrayType: ArrayType => Array(explode(col(cols)))
       case _ => Array(col(cols))
     }
   })
 }

df.select(flattenSchema(df.schema) :_*).printSchema

root
 |-- _1: string (nullable = true)
 |-- _1: integer (nullable = true)
 |-- _1: integer (nullable = true)
 |-- _2: integer (nullable = true)
 |-- col: integer (nullable = false)

这篇关于用不同的数据类型在 Scala 中展平数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆