如何在Spark SCALA中的AWS中重命名Spark数据框架输出文件 [英] How to rename spark data frame output file in AWS in spark SCALA
问题描述
我将Spark数据帧输出保存为带有分区的Scala中的csv文件. 这就是我在 Zeppelin 中执行的操作.
I am saving my spark data frame output as csv file in scala with partitions. This is how i do that in Zeppelin.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)
val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
.filter(!$"FFAction|!|".contains("D|!|"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsmallfffile/FinancialLineItem/output")
val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count
FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
.option("rootTag", "FFFileType")
.option("rowTag", "FFPhysicalFile")
.save("s3://trfsmallfffile/FinancialLineItem/Descr")
现在,文件将保存在预期的分区文件夹结构中.
Now files are saved in partitioned folder structure which is expected .
现在,我的要求是重命名所有零件文件并将其保存在一个目录中. 文件名将作为文件夹结构的名称.
Now my requiremen is to rename all the part file and save it in one directory . The name of the file will be as the name of the folder structure .
例如,我在folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
现在我希望我的文件名为
Now i want my file name to be
Japan.1971.1.txt.gz
Japan.1971.2.txt.gz
我的工作完成后,我已经在java map-reduce中完成了此操作,然后我正在读取HDFS文件系统,然后将其作为重命名的文件名移至其他位置.
I have done this in java map-reduce after my job is completed then i was reading HDFS files system and then moved it into different location as renamed file name .
但是如何在Spark SCALA的AWS S3文件系统中做到这一点.
But how do to the this in AWS S3 files system in spark SCALA .
据我研究,没有直接方法可以重命名spark数据框输出文件名.
As far as i have research there is no direct way to rename spark data frame output file name.
但是可以使用MultipleOutputs
作为 saveAsHadoopFile 在工作本身中实现,但是如何实现呢?
But there is implementation that can be done in the job itself using MultipleOutputs
as saveAsHadoopFile but how to do that ?.
我正在Scala中寻找一些示例代码
I am looking for some sample code in scala
就像完成工作之后,我们需要从s3中读取文件,重新命名并将其移动到其他位置.
It is as like after completing job we need to read the file from s3,reame it and move it to some other location .
推荐答案
val tempOutPath = "mediamath.dir"
headerDf.union(outDf)
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.format("text")
.option("codec", "gzip")
.save(tempOutPath)
import org.apache.hadoop.fs._
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("mediamath.dir/part*.gz"))(0).getPath.getName
fs.rename(new Path("mediamath.dir/" + file), new Path(<aws-s3-path>))
这是我的代码段,请查看是否对您有帮助.
Here is my code snippet please see if this helps you.
这篇关于如何在Spark SCALA中的AWS中重命名Spark数据框架输出文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!