合并具有不同模式的数据帧 - Scala Spark [英] Merge Dataframes With Differents Schemas - Scala Spark

查看:26
本文介绍了合并具有不同模式的数据帧 - Scala Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将 JSON 转换为数据帧.在第一步中,我创建了一个数据帧数组,然后创建了一个联合.但是我在使用不同模式的 JSON 中进行联合时遇到了问题.

I'm working in transform a JSON into a Data Frame. In the first step I create an Array of Data Frame and after that I make an Union. But I've a problem to do a Union in a JSON with Different Schemas.

如果 JSON 具有与您在另一个问题中看到的相同的架构,我可以做到:使用 Spark-Scala 解析列中的 JSON 根

I Can do it if the JSON have the same Schema like you can see in this other question: Parse JSON root in a column using Spark-Scala

我正在处理以下数据:

val exampleJsonDifferentSchema = spark.createDataset(

      """
      {"ITEM1512":
            {"name":"Yin",
             "address":{"city":"Columbus",
                        "state":"Ohio"},
             "age":28           }, 
        "ITEM1518":
            {"name":"Yang",
             "address":{"city":"Working",
                        "state":"Marc"}
                        },
        "ITEM1458":
            {"name":"Yossup",
             "address":{"city":"Macoss",
                        "state":"Microsoft"},
            "age":28
                        }
      }""" :: Nil)

如您所见,不同之处在于一个数据框没有年龄.

As you see the difference is that one Data Frame doesn't have Age.

val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema

+---------------------------------+---------------------------+-----------------------+
|ITEM1458                         |ITEM1512                   |ITEM1518               |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+

root
 |-- ITEM1458: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

我现在的解决方案是如下代码,其中我制作了一个 DataFrame 数组:

My solution now is as the follow code where i make an array of DataFrame:

val columns:Array[String]       = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))

  arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

但是我有一个带有不同架构的 JSON,当我在联合中减少时我不能这样做,因为数据框需要具有相同的架构.事实上,我有以下错误:

But I've a JSON with Different Schemas when I reduce in a union I can't do it because the Data Frame need to have the same Schema. In fact, I've the following error:

org.apache.spark.sql.AnalysisException:联合只能在具有兼容列类型的表格.

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types.

我正在尝试做一些我在这个问题中发现的类似的事情:如何在spark中对具有不同列数的两个DataFrame进行联合?

I'm trying to do something similar I've found in this question: How to perform union on two DataFrames with different amounts of columns in spark?

特别是那部分:

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

但是我无法为列设置集合,因为我需要动态捕获总计​​和单项的列.我只能做这样的事情:

But I cant make the set for the columns because I need to catch dynamically the columns both totals and singles. I only can do something like that:

for(i <- 0 until arrayOfExampleDFs.length-1) {

    val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
    val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
    val total = cols1 ++ cols2

    arrayOfExampleDFs(i).select("Value").printSchema()

    print(total)
}

那么,一个函数怎么会动态地执行这个联合呢?

So, how could be a function that do this union dynamically?

更新:预期输出

在这种情况下,此数据框和架构:

In this Case This Data Frame and Schema:

+--------+---------------------------------+
|Item    |Value                            |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin]      |
|ITEM1518|[[Working, Marc], null, Yang]    |
+--------+---------------------------------+

root
 |-- Item: string (nullable = false)
 |-- Value: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)

推荐答案

这是一种可能的解决方案,它通过在未找到年龄列时添加年龄列来为所有数据框创建一个通用架构:

Here is one possible solution which creates a common schema for all the dataframes by adding the age column when it is not found:

import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

....

for(col_name <- columns){
  val currentDf = itemsExampleDiff.select(col(col_name))

  // try to identify if age field is present
  val hasAge = currentDf.schema.fields(0)
                        .dataType
                        .asInstanceOf[StructType]
                        .fields
                        .contains(StructField("age", LongType, true))

  val valueCol = hasAge match {
    // if not construct a new value column
    case false => struct(
                    col(s"${col_name}.address"), 
                    lit(null).cast("bigint").as("age"),
                    col(s"${col_name}.name")
                  )

    case true => col(col_name)
  }

  arrayOfExampleDFs = arrayOfExampleDFs :+ currentDf.select(lit(col_name).as("Item"), valueCol.as("Value"))
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

// +--------+---------------------------------+
// |Item    |Value                            |
// +--------+---------------------------------+
// |ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
// |ITEM1512|[[Columbus, Ohio], 28, Yin]      |
// |ITEM1518|[[Working, Marc],, Yang]         |
// +--------+---------------------------------+

分析:可能最苛刻的部分是找出 age 是否存在.为了查找,我们使用 df.schema.fields 属性,它允许我们深入了解每列的内部模式.

Analysis: probably the most demanding part is finding out whether the age is present or not. For the look up we use df.schema.fields property which allow us to dig into the internal schema of each column.

当年龄未找到时,我们使用 struct 重新生成列:

When age is not found we regenerate the column by using a struct:

struct(
   col(s"${col_name}.address"), 
   lit(null).cast("bigint").as("age"),
   col(s"${col_name}.name")
)

这篇关于合并具有不同模式的数据帧 - Scala Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆