使用单个标头合并Spark输出CSV文件 [英] Merge Spark output CSV files with a single header

查看:482
本文介绍了使用单个标头合并Spark输出CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在AWS中创建数据处理管道,以最终将处理后的数据用于机器学习.

I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.

我有一个Scala脚本,该脚本从S3获取原始数据,对其进行处理,然后使用 Spark-CSV 将其写入HDFS或什至S3.我想如果要使用 AWS Machine Learning 工具来训练预测模型,可以使用多个文件作为输入.但是,如果我想使用其他东西,我想最好是收到一个CSV输出文件.

I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.

当前,由于我不想出于性能目的使用 repartition(1) coalesce(1),因此,我已使用 hadoop fs -getmerge 进行手动测试,但是由于它只是合并作业输出文件的内容,因此我遇到了一个小问题.我需要数据文件中的单行标题来训练预测模型.

Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.

如果我将.option("header","true")用于spark-csv,则它将标头写入每个输出文件,并且合并后,数据中的标头行与输出文件一样多.但是,如果header选项为false,则不会添加任何标题.

If I use .option("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

现在,我找到了一个将Scala脚本中的文件与Hadoop API合并的选项.

Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shell with the code below.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

但是此解决方案仍然只是将文件彼此连接在一起,并且不处理标头. 如何获取只有一行标题的输出文件?

But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?

我什至尝试添加df.columns.mkString(",")作为copyMerge的最后一个参数,但这仍然多次添加标头,而不是一次.

I even tried adding df.columns.mkString(",") as the last argument for copyMerge, but this added the headers still multiple times, not once.

推荐答案

您可以像这样走动.

  • 1.创建一个包含标题名称的新DataFrame(headerDF).
  • 2.与包含数据的DataFrame(dataDF)合并.
  • 3.使用 option("header","false")将联合的DataFrame输出到磁盘.
  • 4.使用hadoop FileUtil合并分区文件(part-0000 ** 0.csv)
  • 1.Create a new DataFrame(headerDF) containing header names.
  • 2.Union it with the DataFrame(dataDF) containing the data.
  • 3.Output the union-ed DataFrame to disk with option("header", "false").
  • 4.merge partition files(part-0000**0.csv) using hadoop FileUtil

通过这种方式,所有分区都没有标题,除了单个分区的内容具有来自headerDF的标题名称行.当所有分区合并在一起时,文件顶部将只有一个标头.示例代码如下

In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)

这篇关于使用单个标头合并Spark输出CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆