Scala中两个数据框的架构比较 [英] Schema comparison of two dataframes in scala

查看:61
本文介绍了Scala中两个数据框的架构比较的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一些测试用例,以验证源(.csv)文件和目标(配置单元表)之间的数据.验证之一是表的结构验证.

I am trying to write some test cases to validate the data between source (.csv) file and target (hive table). One of the validation is the Structure validation of the table.

我已经将.csv数据(使用已定义的架构)加载到一个数据帧中,并将配置单元表数据提取到了另一个数据帧中.
现在,当我尝试比较两个数据帧的架构时,它返回false.不知道为什么.请问对此有什么想法吗?

I have load the .csv data (using a defined schema) into one dataframe and extracted the hive table data into another dataframe.
When I now try to compare the schema of the two dataframes, it returns false. Not sure why. Any idea on this please?

源数据框架架构:

scala> res39.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

目标数据框架构:

scala> targetRawData.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

当我比较时,它返回false:

When I compare, it returns false:

scala> res39.schema == targetRawData.schema
res47: Boolean = false

两个数据框中的数据如下所示:

Data in the two dataframes is shown below:

scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS| Naveen |             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS| Naveen |             100|   115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25|        RBS|   Arun |             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|   Arun |             100|    30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12|        XZX|   Arun |             400|     12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+


scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS|  Naveen|             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS|  Naveen|             100|   115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25|        RBS|    Arun|             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|    Arun|             100|    30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+

完整的代码如下:

//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    val sourceDataLocation = "hdfs://localhost:9000/source.txt"
    val targetTableName = "TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here
    // Test 2 - Validate the Structure
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val dataFrame = spark.createDataFrame(data,schema)
       val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
       data.collect
       data.getClass
    // Test 3 - Validate the data
    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - Test 5

  def UpdateResult(tableName: String, returnCode: Int, description: String){
    val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

  }

推荐答案

基于 @Derek Kaknes 的回答,这是我想出的用于比较 schema 的解决方案s,仅关注列名称,数据类型&可空性,并且对元数据

Based on @Derek Kaknes's answer, here's the solution I came up with for comparing schemas, being concerned only about column name, datatype & nullability and indifferent to metadata

// Extract relevant information: name (key), type & nullability (values) of columns
def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
    df.schema.map { (structField: StructField) =>
      structField.name.toLowerCase -> (structField.dataType, structField.nullable)
    }.toMap
  }

// Compare relevant information
def getSchemaDifference(schema1: Map[String, (DataType, Boolean)],
                        schema2: Map[String, (DataType, Boolean)]
                       ): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
  (schema1.keys ++ schema2.keys).
    map(_.toLowerCase).
    toList.distinct.
    flatMap { (columnName: String) =>
      val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
      val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)

      if (schema1FieldOpt == schema2FieldOpt) None
      else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
    }.toMap
}

  • getCleanedSchema方法提取感兴趣的信息-列数据类型& 可空性,并将列名称的map返回到tuple

    • getCleanedSchema method extracts information of interest - column datatype & nullability and returns a map of column name to tuple

      getSchemaDifference方法返回一个map,该map仅包含两个架构中不同的列.如果两个模式之一中都不存在列,则其对应的属性将为None

      getSchemaDifference method returns a map containing only those columns that differ in the two schemas. If a column is absent in one of the two schemas, then it's corresponding properties would be None

      这篇关于Scala中两个数据框的架构比较的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆