使用单个标题合并 Spark 输出 CSV 文件 [英] Merge Spark output CSV files with a single header

查看:26
本文介绍了使用单个标题合并 Spark 输出 CSV 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 AWS 中创建一个数据处理管道,最终将处理后的数据用于机器学习.

I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.

我有一个 Scala 脚本,它从 S3 中获取原始数据,对其进行处理并使用 Spark-CSV 将其写入 HDFS 甚至 S3.如果我想使用 AWS 机器学习 工具来训练预测模型,我想可以使用多个文件作为输入.但如果我想使用其他东西,我认为最好是收到一个 CSV 输出文件.

I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.

目前,由于我不想使用 repartition(1)coalesce(1) 来提高性能,因此我使用了 hadoop fs -getmerge 用于手动测试,但由于它只是合并作业输出文件的内容,因此我遇到了一个小问题.我需要在数据文件中一行标题来训练预测模型.

Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.

如果我将 .option("header","true") 用于 spark-csv,那么它将标题写入每个输出文件,合并后我有尽可能多的标题行数据,因为有输出文件.但如果 header 选项为 false,则不会添加任何标题.

If I use .option("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

现在我找到了将 Scala 脚本中的文件与 Hadoop API 合并的选项 FileUtil.copyMerge.我用下面的代码在 spark-shell 中尝试了这个.

Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shell with the code below.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

但是这个解决方案仍然只是将文件相互连接起来,并不处理标题.如何获得只有一行标题的输出文件?

But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?

我什至尝试添加 df.columns.mkString(",") 作为 copyMerge 的最后一个参数,但这仍然多次添加标题,而不是一次.

I even tried adding df.columns.mkString(",") as the last argument for copyMerge, but this added the headers still multiple times, not once.

推荐答案

你可以这样四处走动.

  • 1.创建一个包含标题名称的新 DataFrame(headerDF).
  • 2.将其与包含数据的 DataFrame(dataDF) 联合.
  • 3.使用option("header", "false")将合并后的DataFrame输出到磁盘.
  • 4.使用hadoop FileUtil合并分区文件(part-0000**0.csv)
  • 1.Create a new DataFrame(headerDF) containing header names.
  • 2.Union it with the DataFrame(dataDF) containing the data.
  • 3.Output the union-ed DataFrame to disk with option("header", "false").
  • 4.merge partition files(part-0000**0.csv) using hadoop FileUtil

这样,除了单个分区的内容具有来自 headerDF 的一行标题名称之外,所有分区都没有标题.当所有分区合并在一起时,文件顶部有一个标题.示例代码如下

In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)

这篇关于使用单个标题合并 Spark 输出 CSV 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆