使用Spark通过s3a将镶木地板文件写入s3非常慢 [英] Using Spark to write a parquet file to s3 over s3a is very slow

查看:151
本文介绍了使用Spark通过s3a将镶木地板文件写入s3非常慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark 1.6.1parquet文件写出到Amazon S3.我生成的较小的parquet是一次写入的~2GB,因此没有那么多数据.我正在尝试证明Spark作为我可以使用的平台.

I'm trying to write a parquet file out to Amazon S3 using Spark 1.6.1. The small parquet that I'm generating is ~2GB once written so it's not that much data. I'm trying to prove Spark out as a platform that I can use.

基本上我要用dataframes设置star schema,然后将这些表写到实木复合地板上.数据来自供应商提供的csv文件,而我使用Spark作为ETL平台.我目前在ec2(r3.2xlarge)中有一个3节点群集,因此执行器上的内存120GB和总共16个核心.

Basically what I'm going is setting up a star schema with dataframes, then I'm going to write those tables out to parquet. The data comes in from csv files provided by a vendor and I'm using Spark as an ETL platform. I currently have a 3 node cluster in ec2(r3.2xlarge) So 120GB of memory on the executors and 16 cores total.

输入文件总计约22GB,目前我正在提取约2GB的数据.最终,当我开始加载完整的数据集时,这将是数TB.

The input files total about 22GB and I'm extracting about 2GB of that data for now. Eventually this will be many terabytes when I start loading the full dataset.

这是我的火花/scala pseudocode:

Here is my spark/scala pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

对于465884512行,该计数大约需要2分钟.写入实木复合地板需要 38分钟

The count takes about 2 minutes for 465884512 rows. The write to parquet takes 38 minutes

我知道coalesce对驱动程序进行洗牌,该驱动程序进行写....但是花费的时间让我觉得我做错了什么严重的事情.如果没有coalesce,这仍然需要15分钟,而IMO仍然太长,给了我大量的parquet小文件.我想每天拥有一个大文件,而现在我要拥有的数据.我也有代码可以对按字段值进行分区,而且速度也很慢.我还尝试将其输出到csv,这大约需要1个小时.

I understand that the coalesce does a shuffle to the driver which does the write.... but the amount of time it's taking is making me think I'm doing something seriously wrong. Without the coalesce, this still takes 15 minutes, which IMO is still too long and gives me a ton of small parquet files. I'd like to have one large file per day of data that I'll have. I have code to do the partitioning by a field value as well, and it is just as slow. I've also tried to output this to csv and that takes ~1 hour.

此外,我提交工作时并没有真正设置运行时道具.我的一项工作的控制台统计信息是:

Also, I'm not really setting run time props when I'm submitting my job. My console stats for one job are:

  • 在职工人:2
  • 使用中的核心:总计16个,已使用16个
  • 正在使用的内存:总计117.5 GB,已使用107.5 GB
  • 应用程序:1个正在运行,5个已完成
  • 驱动程序:0个正在运行,0个已完成
  • 状态:有效

推荐答案

Spark默认值在I/O操作期间会导致大量(可能)不必要的开销,尤其是在写入S3时. 这篇文章对此进行了更详尽的讨论,但是您有2个设置需要考虑更改.

Spark defaults cause a large amount of (probably) unnecessary overhead during I/O operations, especially when writing to S3. This article discusses this more thoroughly, but there are 2 settings you'll want to consider changing.

  • 使用DirectParquetOutputCommitter.默认情况下,Spark将所有数据保存到一个临时文件夹,然后再移动这些文件.通过直接写入S3输出路径,使用DirectParquetOutputCommitter将节省时间.

  • 在Spark 2.0+中不再可用
    • 如吉拉机票中所述,当前解决方案是
    • No longer available in Spark 2.0+
      • As stated in the jira ticket, the current solution is to
      1. 将代码切换为使用s3a和Hadoop 2.7.2+;总体而言更好,并且在Hadoop 2.8中变得更好,并且是s3guard的基础
      2. 使用Hadoop FileOutputCommitter并将mapreduce.fileoutputcommitter.algorithm.version设置为2

    • -架构合并处于关闭状态> Spark 1.5 关闭架构合并.如果启用模式合并,则驱动程序节点将扫描所有文件以确保模式一致.这特别昂贵,因为它不是分布式操作.确保通过执行此操作将其关闭

      -Schema merging is turned off by default as of Spark 1.5 Turn off Schema Merging. If schema merging is on, the driver node will scan all of the files to ensure a consistent schema. This is especially costly because it is not a distributed operation. Make sure this is turned off by doing

      val file = sqx.read.option("mergeSchema", "false").parquet(path)

      这篇关于使用Spark通过s3a将镶木地板文件写入s3非常慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆