如何在火花 SCALA 中重命名 AWS 中的火花数据帧输出文件 [英] How to rename spark data frame output file in AWS in spark SCALA

查看:24
本文介绍了如何在火花 SCALA 中重命名 AWS 中的火花数据帧输出文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将我的 spark 数据帧输出保存为带有分区的 scala 中的 csv 文件.这就是我在 Zeppelin 中的做法.

I am saving my spark data frame output as csv file in scala with partitions. This is how i do that in Zeppelin.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
        when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
        when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)


dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialLineItem/output")

  val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/FinancialLineItem/Descr")

现在文件保存在预期的分区文件夹结构中.

Now files are saved in partitioned folder structure which is expected .

现在我的要求是重命名所有零件文件并将其保存在一个目录中.文件名将作为文件夹结构的名称.

Now my requiremen is to rename all the part file and save it in one directory . The name of the file will be as the name of the folder structure .

例如我有一个文件保存在 folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz

For example i have one file saved in folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz

现在我希望我的文件名是

Now i want my file name to be

Japan.1971.1.txt.gz
Japan.1971.2.txt.gz

我在完成工作后在 java map-reduce 中完成了此操作,然后我正在读取 HDFS 文件系统,然后将其移动到不同的位置作为重命名的文件名.

I have done this in java map-reduce after my job is completed then i was reading HDFS files system and then moved it into different location as renamed file name .

但是如何在 spark SCALA 中的 AWS S3 文件系统中解决这个问题.

But how do to the this in AWS S3 files system in spark SCALA .

据我研究,没有直接的方法可以重命名 spark 数据帧输出文件名.

As far as i have research there is no direct way to rename spark data frame output file name.

但是可以使用 MultipleOutputs 作为 saveAsHadoopFile 在工作本身中完成实现,但是如何做到这一点?

But there is implementation that can be done in the job itself using MultipleOutputs as saveAsHadoopFile but how to do that ?.

我正在寻找一些Scala中的示例代码

I am looking for some sample code in scala

就像完成工作后,我们需要从 s3 读取文件,对其进行扩孔并将其移动到其他位置.

It is as like after completing job we need to read the file from s3,reame it and move it to some other location .

推荐答案

val tempOutPath = "mediamath.dir"
headerDf.union(outDf)
  .repartition(1)
  .write
  .mode(SaveMode.Overwrite)
  .format("text")
  .option("codec", "gzip")
  .save(tempOutPath)

import org.apache.hadoop.fs._
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("mediamath.dir/part*.gz"))(0).getPath.getName

fs.rename(new Path("mediamath.dir/" + file), new Path(<aws-s3-path>))

这是我的代码片段,请看看是否对您有帮助.

Here is my code snippet please see if this helps you.

这篇关于如何在火花 SCALA 中重命名 AWS 中的火花数据帧输出文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆