在Spark中的网状结构中递归重命名列 [英] Renaming Columns recursively in a netsted Structure in Spark

查看:90
本文介绍了在Spark中的网状结构中递归重命名列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试替换具有大量嵌套Struct类型的DataFrame的所有列中的某些字符.

I am trying to replace certain characters in all the columns of my DataFrame which has lot of nested Struct Types.

我试图递归处理模式字段,由于某种原因,即使到达叶节点,它也只是在顶层重命名字段.

I tried to process the schema fields recursively and for some reason it is only renaming the fields at the top level even through it is reaching the leaf nodes.

我正在尝试将列名中的':'字符替换为'_'

I am trying replace the ':' char in the column name with '_'

这是我写的scala代码.

Here is the scala code i have written.

class UpdateSchema {

  val logger = LoggerFactory.getLogger(classOf[UpdateSchema])

  Logger.getLogger("org").setLevel(Level.OFF)

  Logger.getLogger("akka").setLevel(Level.OFF)

  val sparkSession = SparkLauncher.spark

  import sparkSession.implicits._   

  def updateSchema(filePath:String):Boolean ={
    logger.info(".updateSchema() : filePath ={}",filePath);
    logger.info(".updateSchema() : sparkSession ={}",sparkSession);
    if(sparkSession!=null){
      var xmlDF = sparkSession
                  .read
                  .format("com.databricks.spark.xml")
                  .option("rowTag","ns:fltdMessage")
                  .option("inferschema","true")
                  .option("attributePrefix","attr_")
                  .load(filePath)
                  .toDF()

      xmlDF.printSchema()
      val updatedDF = renameDataFrameColumns(xmlDF.toDF()) 
      updatedDF.printSchema()
    }
    else
      logger.info(".updateSchema(): Spark Session is NULL !!!");
    false;
  }


    def replaceSpecialChars(str:String):String ={
          val newColumn:String =  str.replaceAll(":", "_")
          //logger.info(".replaceSpecialChars() : Old Column Name =["+str+"] New Column Name =["+newColumn+"]")
          return newColumn
      }

      def renameColumn(df:DataFrame,colName:String,prefix:String):DataFrame ={
        val newColuName:String = replaceSpecialChars(colName)
        logger.info(".renameColumn(): prefix=["+prefix+"] colName=["+colName+"] New Column Name=["+newColuName+"]")
        if(prefix.equals("")){
          if(df.col(colName)!=null){
            return df.withColumnRenamed(colName, replaceSpecialChars(colName))
          }
          else{
            logger.error(".logSchema() : Column ["+prefix+"."+colName+"] Not found in DataFrame !! ")
            logger.info("Prefix ="+prefix+" Existing Columns =["+df.columns.mkString("),(")+"]")
            throw new Exception("Unable to find Column ["+prefix+"."+colName+"]")
          }
        }
        else{
          if(df.col(prefix+"."+colName)!=null){
            return df.withColumnRenamed(prefix+"."+colName, prefix+"."+replaceSpecialChars(colName))
          }
          else{
            logger.error(".logSchema() : Column ["+prefix+"."+colName+"] Not found in DataFrame !! ")
            logger.info("Prefix ="+prefix+" Existing Columns =["+df.columns.mkString("),(")+"]")
            throw new Exception("Unable to find Column ["+prefix+"."+colName+"]")
          }
        }
      }

      def getStructType(schema:StructType,fieldName:String):StructType = {
        schema.fields.foreach(field => {
              field.dataType match{
                case st:StructType => {
                  logger.info(".getStructType(): Current Field Name =["+field.name.toString()+"] Checking for =["+fieldName+"]")
                  if(field.name.toString().equals(fieldName)){
                    return field.dataType.asInstanceOf[StructType]
                  }
                  else{
                    getStructType(st,fieldName)
                  }
                }
                case _ =>{
                  logger.info(".getStructType(): Non Struct Type. Ignoring Filed=["+field.name.toString()+"]");
                }
              }
          })
          throw new Exception("Unable to find Struct Type for filed Name["+fieldName+"]")
      }

      def processSchema(df:DataFrame,schema:StructType,prefix:String):DataFrame ={
        var updatedDF:DataFrame =df
        schema.fields.foreach(field =>{
          field.dataType match {
            case st:StructType => {
                logger.info(".processSchema() : Struct Type =["+st+"]");
                logger.info(".processSchema() : Field Data Type =["+field.dataType+"]");
                logger.info(".processSchema() : Renaming the Struct Field =["+field.name.toString()+"] st=["+st.fieldNames.mkString(",")+"]") 
                updatedDF = renameColumn(updatedDF,field.name.toString(),prefix)
                logger.info(".processSchema() : Column List after Rename =["+updatedDF.columns.mkString(",")+"]")
               // updatedDF.schema.fields.foldLeft(z)(op)
                val renamedCol:String = replaceSpecialChars(field.name.toString())
                var fieldType:DataType = null;
                //if(prefix.equals(""))
                fieldType = schema.fields.find(f =>{ (f.name.toString().equals(field.name.toString()))}).get.dataType

                if(prefix.trim().equals("") 
                    //&& fieldType.isInstanceOf[StructType]
                    ){
                  updatedDF = processSchema(updatedDF,
                      getStructType(updatedDF.schema,renamedCol),
                      replaceSpecialChars(field.name.toString()))
                }
                else{
                  updatedDF = processSchema(updatedDF,
                      getStructType(updatedDF.schema,renamedCol),
                      prefix+"."+replaceSpecialChars(field.name.toString()))
                }
              }
            case _ => {
              updatedDF = renameColumn(updatedDF,field.name.toString(),prefix)
            }
          }
        })
        //updatedDF.printSchema()


        return updatedDF
      }

      def renameDataFrameColumns(df:DataFrame):DataFrame ={
        val schema = df.schema;
        return processSchema(df,schema,"")
      }
}

推荐答案

这是一种递归方法,可通过replaceAll重命名其名称包含要替换的子字符串的任何列来修改DataFrame模式:

Here's a recursive method that revise a DataFrame schema by renaming via replaceAll any columns whose name consists of a substring to be replaced:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

def renameAllColumns(schema: StructType, from: String, to:String): StructType = {

  def recurRename(schema: StructType, from: String, to:String): Seq[StructField] =
    schema.fields.map{
      case StructField(name, dtype: StructType, nullable, meta) =>
        StructField(name.replaceAll(from, to), StructType(recurRename(dtype, from, to)), nullable, meta)
      case StructField(name, dtype, nullable, meta) =>
        StructField(name.replaceAll(from, to), dtype, nullable, meta)
    }

  StructType(recurRename(schema, from, to))
}

在具有嵌套结构的示例DataFrame上测试方法:

Testing the method on a sample DataFrame with a nested structure:

case class M(i: Int, `p:q`: String)
case class N(j: Int, m: M)

val df = Seq(
  (1, "a", N(7, M(11, "x"))),
  (2, "b", N(8, M(21, "y"))),
  (3, "c", N(9, M(31, "z")))
).toDF("c1", "c2:0", "c3")

df.printSchema
// root
//  |-- c1: integer (nullable = false)
//  |-- c2:0: string (nullable = true)
//  |-- c3: struct (nullable = true)
//  |    |-- j: integer (nullable = false)
//  |    |-- m: struct (nullable = true)
//  |    |    |-- i: integer (nullable = false)
//  |    |    |-- p:q: string (nullable = true)

val rdd = df.rdd

val newSchema = renameAllColumns(df.schema, ":", "_")

spark.createDataFrame(rdd, newSchema).printSchema
// root
//  |-- c1: integer (nullable = false)
//  |-- c2_0: string (nullable = true)
//  |-- c3: struct (nullable = true)
//  |    |-- j: integer (nullable = false)
//  |    |-- m: struct (nullable = true)
//  |    |    |-- i: integer (nullable = false)
//  |    |    |-- p_q: string (nullable = true)

请注意,由于方法replaceAll可以理解Regex模式,因此可以应用该方法来修剪以char':'开头的列名,例如:

Note that since method replaceAll understands Regex pattern, one can apply the method to trim off column name starting from char ':', for example:

val newSchema = renameAllColumns(df.schema, """:.*""", "")

spark.createDataFrame(rdd, newSchema).printSchema
// root
//  |-- c1: integer (nullable = false)
//  |-- c2: string (nullable = true)
//  |-- c3: struct (nullable = true)
//  |    |-- j: integer (nullable = false)
//  |    |-- m: struct (nullable = true)
//  |    |    |-- i: integer (nullable = false)
//  |    |    |-- p: string (nullable = true)

这篇关于在Spark中的网状结构中递归重命名列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆