如何调整EMR上的Spark作业以在S3上快速写入大量数据 [英] How to tune spark job on EMR to write huge data quickly on S3

查看:111
本文介绍了如何调整EMR上的Spark作业以在S3上快速写入大量数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花工作,我正在两个数据帧之间进行外部联接. 第一个数据帧的大小为260 GB,文件格式为文本文件,分为2200个文件,第二个数据帧的大小为2GB. 然后将大约260 GB的数据帧输出写入S3需要很长时间,因为我已经在EMR上进行了很大的更改,所以我取消了2个小时之后.

I have a spark job where i am doing outer join between two data frames . Size of first data frame is 260 GB,file format is text files which is split into 2200 files and the size of second data frame is 2GB . Then writing data frame output which is about 260 GB into S3 takes very long time is more than 2 hours after that i cancelled because i have been changed heavily on EMR .

这是我的集群信息.

emr-5.9.0
Master:    m3.2xlarge
Core:      r4.16xlarge   10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)

这是我正在设置的集群配置

This is my cluster config that i am setting

capacity-scheduler  yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site  fs.s3.maxConnections:   200
spark   maximizeResourceAllocation: true
spark-defaults  spark.dynamicAllocation.enabled:    true

我也尝试手动设置内存组件,如下所示,性能更好,但同样的事情又花费了很长时间

-num-executors 60--conf spark.yarn.executor.memoryOverhead = 9216 --executor-memory72G --conf spark.yarn.driver.memoryOverhead = 3072 --driver-memory 26G --executor-cores 10-驱动程序核心3 --conf spark.default.parallelism = 1200

--num-executors 60--conf spark.yarn.executor.memoryOverhead=9216 --executor-memory 72G --conf spark.yarn.driver.memoryOverhead=3072 --driver-memory 26G --executor-cores 10 --driver-cores 3 --conf spark.default.parallelism=1200

我没有使用默认分区将数据保存到S3中.

I am not using default partition to save data into S3 .

添加有关作业和查询计划的所有详细信息,以使其易于理解.

Adding all details about the jobs and query plan so that it will be easy to understand .

真正的原因是分区.这占用了大多数时间. 因为我有2K个文件,所以如果我使用像200这样的重新分区输出 文件以十万个进来,然后再次在spark中加载不是一个好习惯 故事.

The real reason is partition .And that is taking most of the time. Because i have 2K files so if i use re partition like 200 the output files comes in lakhs and then loading again in spark is not a good story .

在下面的图片中,我不知道为什么在项目之后再次调用了排序

在下面,Image GC对我来说太高了..oi是否必须处理此问题,请提出建议吗?

下面是节点的健康状态.这点数据已保存到S3中,也就怪不得为什么我只能看到两个节点处于活动状态而所有节点都处于空闲状态.

这是正在加载时的群集详细信息.此时,我可以看到群集已得到充分利用,但是在将数据保存到S3时,许多节点是免费的.

最后这是我执行Join并保存到S3中的代码.

Finally here is my code where i perform Join and then save into S3...

import org.apache.spark.sql.expressions._

          val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
          val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

          val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
          val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
          val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs

          //Joining both dara frame here
          val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
          //Joing ends here

          val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

          val headerColumn = dataHeader.columns.toSeq

          val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

          val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)

          //  dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
  .write
  .partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
  .format("csv")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "bzip2")
  .save(outputFileURL)

推荐答案

您正在运行五个c3.4large EC2实例,每个实例具有30gb的RAM.因此,总共只有150GB,比要加入的大于200GB的数据帧小得多.因此,大量磁盘溢出.也许您可以启动r型EC2实例(与针对内存类型进行优化的c类型相反,对c类型进行了计算优化),然后查看性能是否有所提高.

You are running five c3.4large EC2 instances, which has 30gb of RAM each. So thats only 150GB in total which is much smaller than your >200GB dataframe to be joined. Hence lots of disk spill. Maybe you can launch r type EC2 instances (memory optimized opposed to c type which is computation optimized) instead, and see if there is a performance improvement.

这篇关于如何调整EMR上的Spark作业以在S3上快速写入大量数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆