如何在 EMR 上调整 Spark 作业以在 S3 上快速写入大量数据 [英] How to tune spark job on EMR to write huge data quickly on S3

查看:34
本文介绍了如何在 EMR 上调整 Spark 作业以在 S3 上快速写入大量数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花作业,我在两个数据帧之间进行外连接.第一个数据框的大小为 260 GB,文件格式为文本文件,分为 2200 个文件,第二个数据框的大小为 2GB.然后将大约 260 GB 的数据帧输出写入 S3 需要很长时间,之后我取消了 2 个多小时,因为我在 EMR 上进行了大量更改.

I have a spark job where i am doing outer join between two data frames . Size of first data frame is 260 GB,file format is text files which is split into 2200 files and the size of second data frame is 2GB . Then writing data frame output which is about 260 GB into S3 takes very long time is more than 2 hours after that i cancelled because i have been changed heavily on EMR .

这是我的集群信息.

emr-5.9.0
Master:    m3.2xlarge
Core:      r4.16xlarge   10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)

这是我正在设置的集群配置

This is my cluster config that i am setting

capacity-scheduler  yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site  fs.s3.maxConnections:   200
spark   maximizeResourceAllocation: true
spark-defaults  spark.dynamicAllocation.enabled:    true

我也尝试手动设置内存组件,如下所示,性能更好,但同样需要很长时间

--num-executors 60--conf spark.yarn.executor.memoryOverhead=9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead=3072 --driver-memory 26G --executor-cores10 --driver-cores 3 --conf spark.default.parallelism=1200

--num-executors 60--conf spark.yarn.executor.memoryOverhead=9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead=3072 --driver-memory 26G --executor-cores 10 --driver-cores 3 --conf spark.default.parallelism=1200

我没有使用默认分区将数据保存到 S3 中.

I am not using default partition to save data into S3 .

添加有关作业和查询计划的所有详细信息,以便于理解.

Adding all details about the jobs and query plan so that it will be easy to understand .

真正的原因是分区.这占用了大部分时间.因为我有 2K 个文件所以如果我使用像 200 这样的重新分区输出文件有十万个,然后在 spark 中再次加载不是一个好方法故事.

The real reason is partition .And that is taking most of the time. Because i have 2K files so if i use re partition like 200 the output files comes in lakhs and then loading again in spark is not a good story .

在下图中,我不知道为什么在项目之后再次调用 sort

在下面的 Image GC 对我来说太高了..oi 必须处理这个吗,请建议如何处理?

以下是节点的健康状态.此时数据正在保存到 S3 中,难怪我看到只有两个节点处于活动状态,并且都处于空闲状态.

这是加载时的集群详细信息..此时我可以看到集群已被充分利用,但在将数据保存到 S3 时,许多节点是空闲的.

最后这里是我执行加入然后保存到 S3 的代码...

Finally here is my code where i perform Join and then save into S3...

import org.apache.spark.sql.expressions._

          val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
          val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

          val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
          val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
          val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs

          //Joining both dara frame here
          val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
          //Joing ends here

          val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

          val headerColumn = dataHeader.columns.toSeq

          val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

          val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)

          //  dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
  .write
  .partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
  .format("csv")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "bzip2")
  .save(outputFileURL)

推荐答案

S3 是一个对象存储而不是一个文件系统,因此最终一致性、非原子重命名操作引起的问题,即每次执行程序写入工作的结果,他们每个人都写入主目录(在 S3 上)之外的临时目录,文件必须在该目录中写入,一旦所有执行程序都完成,就会进行重命名以获得原子独占性.在像 hdfs 这样的标准文件系统中,这一切都很好,其中重命名是即时的,但在像 S3 这样的对象存储中,这不利于 S3 上的重命名以 6MB/s 的速度完成.

S3 is an object store and not a file system, hence the issues arising out of eventual consistency, non-atomic rename operations i.e., every time the executors writes the result of the job, each of them write to a temporary directory outside the main directory (on S3) where the files had to be written and once all the executors are done a rename is done to get atomic exclusivity. This is all fine in a standard filesystem like hdfs where renames are instantaneous but on an object store like S3, this is not conducive as renames on S3 are done at 6MB/s.

要克服上述问题,请确保设置以下两个配置参数

To overcome above problem, ensure setting the following two conf parameters

1) spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

1) spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2

对于该参数的默认值,即 1,commitTask 将任务生成的数据从任务临时目录移动到作业临时目录,当所有任务完成后,commitJob 将数据从作业临时目录移动到最终目的地.因为驱动在做commitJob的工作,对于S3来说,这个操作可能需要很长时间.用户可能经常认为他/她的手机挂了".但是,当 mapreduce.fileoutputcommitter.algorithm.version 的值为 2 时,commitTask 会将任务生成的数据直接移动到最终目的地,commitJob 基本上是空操作.

For default value of this parameter i.e. 1, commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data to from job temporary directory the final destination. Because the driver is doing the work of commitJob, for S3, this operation can take a long time. A user may often think that his/her cell is "hanging". However, when the value of mapreduce.fileoutputcommitter.algorithm.version is 2, commitTask will move data generated by a task directly to the final destination and commitJob is basically a no-op.

2) spark.speculation=false

2) spark.speculation=false

如果此参数设置为 true,那么如果一个或多个任务在一个阶段运行缓慢,它们将被重新启动.如上所述,通过 spark 作业在 S3 上的写入操作非常慢,因此我们可以看到随着输出数据大小的增加,许多任务会重新启动.

In case this parameter is set to true then if one or more tasks are running slowly in a stage, they will be re-launched. As mentioned in above the write operation on S3 through spark job is very slow and hence we can see a lot of tasks getting re-launched as the output data size increases.

这连同最终的一致性(将文件从临时目录移动到主数据目录时)可能会导致 FileOutputCommitter 进入死锁,因此作业可能会失败.

This along with eventual consistency (while moving files from temporary directory to main data directory) may cause FileOutputCommitter to go into dead lock and hence the job could fail.

或者

您可以先将输出写入 EMR 上的本地 HDFS,然后使用 hadoop distcp 命令将数据移动到 S3.这大大提高了整体输出速度.但是,您的 EMR 节点上将需要足够的 EBS 存储空间,以确保您的所有输出数据都适合.

You can write the output first to the local HDFS on EMR and then move the data to S3 using the hadoop distcp command. This improves the overall output speed drastically. However, you will need enough EBS storage on your EMR nodes to ensure all your output data fits in.

此外,您可以以 ORC 格式写入输出数据,这将大大压缩输出大小.

参考:

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

这篇关于如何在 EMR 上调整 Spark 作业以在 S3 上快速写入大量数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆