仅展平 Scala Spark 数据帧中的最深级别 [英] Flatten only deepest level in scala spark dataframe

查看:23
本文介绍了仅展平 Scala Spark 数据帧中的最深级别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Spark 作业,它有一个具有以下值的 DataFrame:

I have a Spark job, which has a DataFrame with the following value :

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type": {
      "isMale": true,
      "id": "dd",
      "mcc": 1234,
      "name": "Adam"
    }
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
    "type2": {
      "isMale": true,
      "id": "dd",
      "mcc": 12134,
      "name": "Perth"
    }
  }
}

并且我想优雅地将它展平(因为没有未知的键和类型等),这样 props 仍然作为 struct 但里面的所有东西都被展平了(不管嵌套级别)

and I want to flatten it out elegantly (as no of keys is unknown and type etc) in such a way that props remains as a struct but everything inside it is flattened off (irrespective of the level of nesting)

所需的输出是:

{
  "id": "abchchd",
  "test_id": "ndsbsb",
  "props": {
    "type.isMale": true,
    "type.id": "dd",
    "type.mcc": 1234,
    "type.name": "Adam"
  }
}

{
  "id": "abc",
  "test_id": "asf",
  "props": {
      "type2.isMale": true,
      "type2.id": "dd",
      "type2.mcc": 12134,
      "type2.name": "Perth"
  }
}

我使用了中提到的解决方案在 Spark SQL 中自动优雅地展平 DataFrame

I used the solution mentioned in Automatically and Elegantly flatten DataFrame in Spark SQL

但是,我无法保持 props 字段完好无损.它也会被压扁.有人可以帮助我扩展此解决方案吗?

however, I'm unable to keep the props field intact. It also gets flattened off. Can somebody help me with extending this solution?

最终的架构应该是这样的:

The final schema should be something like :

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
      |-- type2.id: string (nullable = true)
 |    |-- type2.isMale: boolean (nullable = true)
 |    |-- type2.mcc: long (nullable = true)
 |    |-- type2.name: string (nullable = true)
 |-- test_id: string (nullable = true)

推荐答案

我已经能够通过 RDD API 实现这一点:

I've been able to achieve this with the RDD API :

val jsonRDD = df.rdd.map{row =>
  def unnest(r: Row): Map[String, Any] = {
    r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
      (f.name, f.dataType) match {
        case ("props", _:StructType) =>
          val propsObject = r.getAs[Row](f.name)
          Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
            val subObject = propsObject.getAs[Row](propsAttr.name)
            subObject.schema.fields.map{subField =>
              s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
            }
          }.toMap)
        case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
        case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
        case _ => Map(f.name -> r.get(i))
      }
    }
  }.toMap

  val asMap = unnest(row)
  new ObjectMapper().registerModule(DefaultScalaModule).writeValueAsString(asMap)
}

val finalDF = spark.read.json(jsonRDD.toDS).cache

由于递归,解决方案应该接受深度嵌套的输入.

The solution should accept deeply nested inputs, thanks to recursion.

有了你的数据,我们得到了:

With your data, here's what we get :

finalDF.printSchema()
finalDF.show(false)
finalDF.select("props.*").show()

输出:

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
 |-- test_id: string (nullable = true)

+-------+----------------------+-------+
|id     |props                 |test_id|
+-------+----------------------+-------+
|abchchd|[dd, true, 1234, Adam]|ndsbsb |
+-------+----------------------+-------+

+-------+-----------+--------+---------+
|type.id|type.isMale|type.mcc|type.name|
+-------+-----------+--------+---------+
|     dd|       true|    1234|     Adam|
+-------+-----------+--------+---------+

但我们也可以传递更多嵌套/复杂的结构,例如:

But we can also pass more nested/complexe structures like for instance :

val str2 = """{"newroot":[{"mystruct":{"id":"abchchd","test_id":"ndsbsb","props":{"type":{"isMale":true,"id":"dd","mcc":1234,"name":"Adam"}}}}]}"""

...

finalDF.printSchema()
finalDF.show(false)

给出以下输出:

root
 |-- newroot: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mystruct: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- props: struct (nullable = true)
 |    |    |    |    |-- type.id: string (nullable = true)
 |    |    |    |    |-- type.isMale: boolean (nullable = true)
 |    |    |    |    |-- type.mcc: long (nullable = true)
 |    |    |    |    |-- type.name: string (nullable = true)
 |    |    |    |-- test_id: string (nullable = true)

+---------------------------------------------+
|root                                         |
+---------------------------------------------+
|[[[abchchd, [dd, true, 1234, Adam], ndsbsb]]]|
+---------------------------------------------+

编辑:正如您提到的,如果您有不同结构的记录,您需要将上述 subObject 值包装在一个选项中.
这是固定的 unnest 函数:

EDIT: As you mentioned, if you have records with different structure you need to wrap the above subObject value in an option.
Here's the fixed unnest function :

def unnest(r: Row): Map[String, Any] = {
  r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
    (f.name, f.dataType) match {
      case ("props", _:StructType) =>
        val propsObject = r.getAs[Row](f.name)
        Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
          val subObjectOpt = Option(propsObject.getAs[Row](propsAttr.name))
          subObjectOpt.toSeq.flatMap{subObject => subObject.schema.fields.map{subField =>
            s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
          }}
        }.toMap)
      case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
      case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
      case _ => Map(f.name -> r.get(i))
    }
  }
}.toMap

新的 printSchema 给出:

root
 |-- id: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- type.id: string (nullable = true)
 |    |-- type.isMale: boolean (nullable = true)
 |    |-- type.mcc: long (nullable = true)
 |    |-- type.name: string (nullable = true)
 |    |-- type2.id: string (nullable = true)
 |    |-- type2.isMale: boolean (nullable = true)
 |    |-- type2.mcc: long (nullable = true)
 |    |-- type2.name: string (nullable = true)
 |-- test_id: string (nullable = true)

这篇关于仅展平 Scala Spark 数据帧中的最深级别的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆