如何在Spark SCALA中的AWS中重命名Spark数据框架输出文件 [英] How to rename spark data frame output file in AWS in spark SCALA

查看:102
本文介绍了如何在Spark SCALA中的AWS中重命名Spark数据框架输出文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将Spark数据帧输出保存为带有分区的Scala中的csv文件. 这就是我在 Zeppelin 中执行的操作.

I am saving my spark data frame output as csv file in scala with partitions. This is how i do that in Zeppelin.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
        when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
        when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)


dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialLineItem/output")

  val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/FinancialLineItem/Descr")

现在,文件将保存在预期的分区文件夹结构中.

Now files are saved in partitioned folder structure which is expected .

现在,我的要求是重命名所有零件文件并将其保存在一个目录中. 文件名将作为文件夹结构的名称.

Now my requiremen is to rename all the part file and save it in one directory . The name of the file will be as the name of the folder structure .

例如,我在folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz

现在我希望我的文件名为

Now i want my file name to be

Japan.1971.1.txt.gz
Japan.1971.2.txt.gz

我的工作完成后,我已经在java map-reduce中完成了此操作,然后我正在读取HDFS文件系统,然后将其作为重命名的文件名移至其他位置.

I have done this in java map-reduce after my job is completed then i was reading HDFS files system and then moved it into different location as renamed file name .

但是如何在Spark SCALA的AWS S3文件系统中做到这一点.

But how do to the this in AWS S3 files system in spark SCALA .

据我研究,没有直接方法可以重命名spark数据框输出文件名.

As far as i have research there is no direct way to rename spark data frame output file name.

但是可以使用MultipleOutputs作为 saveAsHadoopFile 在工作本身中实现,但是如何实现呢?

But there is implementation that can be done in the job itself using MultipleOutputs as saveAsHadoopFile but how to do that ?.

我正在Scala中寻找一些示例代码

I am looking for some sample code in scala

就像完成工作之后,我们需要从s3中读取文件,重新命名并将其移动到其他位置.

It is as like after completing job we need to read the file from s3,reame it and move it to some other location .

推荐答案

val tempOutPath = "mediamath.dir"
headerDf.union(outDf)
  .repartition(1)
  .write
  .mode(SaveMode.Overwrite)
  .format("text")
  .option("codec", "gzip")
  .save(tempOutPath)

import org.apache.hadoop.fs._
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("mediamath.dir/part*.gz"))(0).getPath.getName

fs.rename(new Path("mediamath.dir/" + file), new Path(<aws-s3-path>))

这是我的代码段,请查看是否对您有帮助.

Here is my code snippet please see if this helps you.

这篇关于如何在Spark SCALA中的AWS中重命名Spark数据框架输出文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆