使用 Spark 通过 s3a 将镶木地板文件写入 s3 非常慢 [英] Using Spark to write a parquet file to s3 over s3a is very slow

查看:52
本文介绍了使用 Spark 通过 s3a 将镶木地板文件写入 s3 非常慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark 1.6.1parquet 文件写入 Amazon S3.我生成的小 parquet~2GB 一次编写,所以它没有那么多数据.我试图证明 Spark 是一个我可以使用的平台.

基本上我要做的是用 dataframes 设置一个 star schema,然后我将把这些表写到 parquet.数据来自供应商提供的 csv 文件,我使用 Spark 作为 ETL 平台.我目前在 ec2(r3.2xlarge) 中有一个 3 节点集群,所以 120GB 执行器上的内存和总共 16 个内核.

输入文件总共大约 22GB,我现在正在提取大约 2GB 的数据.最终,当我开始加载完整数据集时,这将是许多 TB.

这是我的 spark/scala 伪代码:

 def loadStage(): Unit = {sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")var sqlCtx = 新的 SQLContext(sc)val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")//设置头表/dfval header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))val header = sqlCtx.createDataFrame(headerRecords, headerSchema)header.registerTempTable("header")sqlCtx.cacheTable("header")//设置事实表/dfval fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")val factSchemaDef = "market_no,rel_date,field1, field2, field3....."val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))val 记录 = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))val df = sqlCtx.createDataFrame(records, factSchema)df.registerTempTable("事实")val results = sqlCtx.sql("select fact.* from header internal join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")println(results.count())results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")}

465884512 行的计数大约需要 2 分钟.写入镶木地板需要 38 分钟

我知道 coalesce 对执行写入的驱动程序进行了洗牌......但是它花费的时间让我觉得我做错了什么.如果没有 coalesce,这仍然需要 15 分钟,IMO 仍然太长并且给了我大量的 parquet 小文件.我希望每天有一个大文件的数据.我也有代码可以按字段值进行分区,而且速度也很慢.我还尝试将其输出到 csv,这需要大约 1 小时.

此外,当我提交我的工作时,我并没有真正设置运行时道具.我的一项工作的控制台统计数据是:

  • 活着的工人:2
  • 正在使用的内核:总共 16 个,已使用 16 个
  • 正在使用的内存:总共 117.5 GB,已使用 107.5 GB
  • 申请:1 个正在运行,5 个已完成
  • 驱动程序:0 个正在运行,0 个已完成
  • 状态:活着

解决方案

Spark 默认值会在 I/O 操作期间导致大量(可能)不必要的开销,尤其是在写入 S3 时.这篇文章对此进行了更深入的讨论,但您需要考虑更改 2 个设置.

  • 使用 DirectParquetOutputCommitter.默认情况下,Spark 会将所有数据保存到一个临时文件夹,然后再移动这些文件.使用 DirectParquetOutputCommitter 将通过直接写入 S3 输出路径来节省时间

    • 在 Spark 2.0+ 中不再可用
      • 如 jira 票中所述,当前的解决方案是<块引用>

        1. 将您的代码切换到使用 s3a 和 Hadoop 2.7.2+ ;全面更好,在 Hadoop 2.8 中变得更好,是 s3guard 的基础
        2. 使用 Hadoop FileOutputCommitter 并将 mapreduce.fileoutputcommitter.algorithm.version 设置为 2

    -从 Spark 1.5 关闭模式合并.如果模式合并打开,驱动程序节点将扫描所有文件以确保模式一致.这特别昂贵,因为它不是分布式操作.确保通过执行此操作将其关闭

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

I'm trying to write a parquet file out to Amazon S3 using Spark 1.6.1. The small parquet that I'm generating is ~2GB once written so it's not that much data. I'm trying to prove Spark out as a platform that I can use.

Basically what I'm going is setting up a star schema with dataframes, then I'm going to write those tables out to parquet. The data comes in from csv files provided by a vendor and I'm using Spark as an ETL platform. I currently have a 3 node cluster in ec2(r3.2xlarge) So 120GB of memory on the executors and 16 cores total.

The input files total about 22GB and I'm extracting about 2GB of that data for now. Eventually this will be many terabytes when I start loading the full dataset.

Here is my spark/scala pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

The count takes about 2 minutes for 465884512 rows. The write to parquet takes 38 minutes

I understand that the coalesce does a shuffle to the driver which does the write.... but the amount of time it's taking is making me think I'm doing something seriously wrong. Without the coalesce, this still takes 15 minutes, which IMO is still too long and gives me a ton of small parquet files. I'd like to have one large file per day of data that I'll have. I have code to do the partitioning by a field value as well, and it is just as slow. I've also tried to output this to csv and that takes ~1 hour.

Also, I'm not really setting run time props when I'm submitting my job. My console stats for one job are:

  • Alive Workers: 2
  • Cores in use: 16 Total, 16 Used
  • Memory in use: 117.5 GB Total, 107.5 GB Used
  • Applications: 1 Running, 5 Completed
  • Drivers: 0 Running, 0 Completed
  • Status: ALIVE

解决方案

Spark defaults cause a large amount of (probably) unnecessary overhead during I/O operations, especially when writing to S3. This article discusses this more thoroughly, but there are 2 settings you'll want to consider changing.

  • Using the DirectParquetOutputCommitter. By default, Spark will save all of the data to a temporary folder then move those files afterwards. Using the DirectParquetOutputCommitter will save time by directly writting to the S3 output path

    • No longer available in Spark 2.0+
      • As stated in the jira ticket, the current solution is to

        1. Switch your code to using s3a and Hadoop 2.7.2+ ; it's better all round, gets better in Hadoop 2.8, and is the basis for s3guard
        2. Use the Hadoop FileOutputCommitter and set mapreduce.fileoutputcommitter.algorithm.version to 2

    -Schema merging is turned off by default as of Spark 1.5 Turn off Schema Merging. If schema merging is on, the driver node will scan all of the files to ensure a consistent schema. This is especially costly because it is not a distributed operation. Make sure this is turned off by doing

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

这篇关于使用 Spark 通过 s3a 将镶木地板文件写入 s3 非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆