合并具有差异模式的数据框-Scala Spark [英] Merge Dataframes With Differents Schemas - Scala Spark

查看:103
本文介绍了合并具有差异模式的数据框-Scala Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将JSON转换为数据框.第一步,我创建一个数据框数组,然后创建一个并集.但是我在使用具有不同模式的JSON中执行并集时遇到问题.

I'm working in transform a JSON into a Data Frame. In the first step I create an Array of Data Frame and after that I make an Union. But I've a problem to do a Union in a JSON with Different Schemas.

如果JSON具有与您在其他问题中看到的相同的架构,我可以这样做:

I Can do it if the JSON have the same Schema like you can see in this other question: Parse JSON root in a column using Spark-Scala

我正在处理以下数据:

val exampleJsonDifferentSchema = spark.createDataset(

      """
      {"ITEM1512":
            {"name":"Yin",
             "address":{"city":"Columbus",
                        "state":"Ohio"},
             "age":28           }, 
        "ITEM1518":
            {"name":"Yang",
             "address":{"city":"Working",
                        "state":"Marc"}
                        },
        "ITEM1458":
            {"name":"Yossup",
             "address":{"city":"Macoss",
                        "state":"Microsoft"},
            "age":28
                        }
      }""" :: Nil)

您看到的区别是,一个数据框没有年龄.

As you see the difference is that one Data Frame doesn't have Age.

val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema

+---------------------------------+---------------------------+-----------------------+
|ITEM1458                         |ITEM1512                   |ITEM1518               |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+

root
 |-- ITEM1458: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

我的解决方案现在是以下代码,在其中我制作了一个DataFrame数组:

My solution now is as the follow code where i make an array of DataFrame:

val columns:Array[String]       = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))

  arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

但是我有一个带有不同架构的JSON,当我减少一个并集时我无法做到这一点,因为数据框需要具有相同的架构.实际上,我遇到以下错误:

But I've a JSON with Different Schemas when I reduce in a union I can't do it because the Data Frame need to have the same Schema. In fact, I've the following error:

org.apache.spark.sql.AnalysisException:联盟只能在上执行 具有兼容列类型的表.

我正在尝试做类似在此问题中发现的事情:

I'm trying to do something similar I've found in this question: How to perform union on two DataFrames with different amounts of columns in spark?

特别是那一部分:

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

但是我不能为列设置集合,因为我需要动态地捕获总计和单列.我只能做这样的事情:

But I cant make the set for the columns because I need to catch dynamically the columns both totals and singles. I only can do something like that:

for(i <- 0 until arrayOfExampleDFs.length-1) {

    val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
    val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
    val total = cols1 ++ cols2

    arrayOfExampleDFs(i).select("Value").printSchema()

    print(total)
}

那么,如何动态地实现这种联合呢?

So, how could be a function that do this union dynamically?

更新:预期输出

在这种情况下,此数据框和架构:

In this Case This Data Frame and Schema:

+--------+---------------------------------+
|Item    |Value                            |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin]      |
|ITEM1518|[[Working, Marc], null, Yang]    |
+--------+---------------------------------+

root
 |-- Item: string (nullable = false)
 |-- Value: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)

推荐答案

这里是一种可能的解决方案,它通过在未找到年龄列的情况下添加年龄列来为所有数据框创建通用模式:

Here is one possible solution which creates a common schema for all the dataframes by adding the age column when it is not found:

import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

....

for(col_name <- columns){
  val currentDf = itemsExampleDiff.select(col(col_name))

  // try to identify if age field is present
  val hasAge = currentDf.schema.fields(0)
                        .dataType
                        .asInstanceOf[StructType]
                        .fields
                        .contains(StructField("age", LongType, true))

  val valueCol = hasAge match {
    // if not construct a new value column
    case false => struct(
                    col(s"${col_name}.address"), 
                    lit(null).cast("bigint").as("age"),
                    col(s"${col_name}.name")
                  )

    case true => col(col_name)
  }

  arrayOfExampleDFs = arrayOfExampleDFs :+ currentDf.select(lit(col_name).as("Item"), valueCol.as("Value"))
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

// +--------+---------------------------------+
// |Item    |Value                            |
// +--------+---------------------------------+
// |ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
// |ITEM1512|[[Columbus, Ohio], 28, Yin]      |
// |ITEM1518|[[Working, Marc],, Yang]         |
// +--------+---------------------------------+

分析:可能最苛刻的部分是查找是否存在age.对于查找,我们使用df.schema.fields属性,该属性使我们能够深入研究每一列的内部架构.

Analysis: probably the most demanding part is finding out whether the age is present or not. For the look up we use df.schema.fields property which allow us to dig into the internal schema of each column.

找不到年龄时,我们使用struct重新生成列:

When age is not found we regenerate the column by using a struct:

struct(
   col(s"${col_name}.address"), 
   lit(null).cast("bigint").as("age"),
   col(s"${col_name}.name")
)

这篇关于合并具有差异模式的数据框-Scala Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆