Spark:将RDD结果写入文件系统的速度很慢 [英] Spark: Writing RDD Results to File System is Slow

查看:1093
本文介绍了Spark:将RDD结果写入文件系统的速度很慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Scala开发Spark应用程序.我的应用程序仅包含一个需要改组的操作(即cogroup).它可以在合理的时间完美运行.我面临的问题是我想将结果写回到文件系统中.由于某种原因,它需要比运行实际程序更长的时间.最初,我尝试在不重新分区或合并的情况下编写结果,但我意识到所生成的文件数量巨大,因此我认为这就是问题所在.在编写之前,我尝试了重新分区(和合并),但是应用程序花了很长时间才能执行这些任务.我知道重新分区(和合并)的成本很高,但是我在做正确的事情吗?如果不是,请您提示我什么是正确的方法.

I'm developing a Spark application with Scala. My application consists of only one operation that requires shuffling (namely cogroup). It runs flawlessly and at a reasonable time. The issue I'm facing is when I want to write the results back to the file system; for some reason, it takes longer than running the actual program. At first, I tried writing the results without re-partitioning or coalescing, and I realized that the number of generated files are huge, so I thought that was the issue. I tried re-partitioning (and coalescing) before writing, but the application took a long time performing these tasks. I know that re-partitioning (and coalescing) is costly, but is what I'm doing the right way? If it's not, could you please give me hints on what's the right approach.

注释:

  • 我的文件系统是Amazon S3.
  • 我的输入数据大小约为130GB.
  • 我的集群包含一个驱动程序节点和五个从属节点,每个从属节点具有16个内核和64 GB的RAM.
  • 我要为我的工作分配15位执行者,每位执行者都有5个内核和19GB的RAM.

PS .我尝试使用数据框,同样的问题.

P.S. I tried using Dataframes, same issue.

这里是我的代码示例,以防万一:

Here is a sample of my code just in case:

val sc = spark.sparkContext

// loading the samples
val samplesRDD = sc
  .textFile(s3InputPath)
  .filter(_.split(",").length > 7)
  .map(parseLine)
  .filter(_._1.nonEmpty) // skips any un-parsable lines


// pick random samples 
val samples1Ids = samplesRDD
  .map(_._2._1) // map to id
  .distinct
  .takeSample(withReplacement = false, 100, 0)

// broadcast it to the cluster's nodes
val samples1IdsBC = sc broadcast samples1Ids

val samples1RDD = samplesRDD
  .filter(samples1IdsBC.value contains _._2._1)

val samples2RDD = samplesRDD
  .filter(sample => !samples1IdsBC.value.contains(sample._2._1))

// compute
samples1RDD
  .cogroup(samples2RDD)
  .flatMapValues { case (left, right) =>
    left.map(sample1 => (sample1._1, right.filter(sample2 => isInRange(sample1._2, sample2._2)).map(_._1)))
  }
  .map {
    case (timestamp, (sample1Id, sample2Ids)) =>
      s"$timestamp,$sample1Id,${sample2Ids.mkString(";")}"
  }

  .repartition(10)
  .saveAsTextFile(s3OutputPath)

更新

以下是使用数据框的相同代码:

Here is the same code using Dataframes:

// loading the samples
val samplesDF = spark
  .read
  .csv(inputPath)
  .drop("_c1", "_c5", "_c6", "_c7", "_c8")
  .toDF("id", "timestamp", "x", "y")
  .withColumn("x", ($"x" / 100.0f).cast(sql.types.FloatType))
  .withColumn("y", ($"y" / 100.0f).cast(sql.types.FloatType))

// pick random ids as samples 1
val samples1Ids = samplesDF
  .select($"id") // map to the id
  .distinct
  .rdd
  .takeSample(withReplacement = false, 1000)
  .map(r => r.getAs[String]("id"))

// broadcast it to the executor
val samples1IdsBC = sc broadcast samples1Ids

// get samples 1 and 2
val samples1DF = samplesDF
  .where($"id" isin (samples1IdsBC.value: _*))

val samples2DF = samplesDF
  .where(!($"id" isin (samples1IdsBC.value: _*)))

samples2DF
  .withColumn("combined", struct("id", "lng", "lat"))
  .groupBy("timestamp")
  .agg(collect_list("combined").as("combined_list"))
  .join(samples1DF, Seq("timestamp"), "rightouter")
  .map {
    case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], sample1Id: String, sample1X: Float, sample1Y: Float) =>
      val sample2Info = samples.filter {
        case Row(_, sample2X: Float, sample2Y: Float) =>
          Misc.isInRange((sample2X, sample2Y), (sample1X, sample1Y), 20)
        case _ => false
      }.map {
        case Row(sample2Id: String, sample2X: Float, sample2Y: Float) =>
          s"$sample2Id:$sample2X:$sample2Y"
        case _ => ""
      }.mkString(";")

      (timestamp, sample1Id, sample1X, sample1Y, sample2Info)
    case Row(timestamp: String, _, sample1Id: String, sample1X: Float, sample1Y: Float) => // no overlapping samples
      (timestamp, sample1Id, sample1X, sample1Y, "")
    case _ =>
      ("error", "", 0.0f, 0.0f, "")
  }
  .where($"_1" notEqual "error")
  //      .show(1000, truncate = false)
  .write
  .csv(outputPath)

推荐答案

这里的问题是通常触发提交任务,通过重命名文件执行作业以及在S3上重命名确实非常缓慢.您写入的数据越多,作业结束所需的时间就越长.那就是你所看到的.

Issue here is that normally spark commit tasks, jobs by renaming files, and on S3 renames are really, really slow. The more data you write, the longer it takes at the end of the job. That what you are seeing.

修复:切换到 S3A提交者,不进行任何重命名.

Fix: switch to the S3A committers, which don't do any renames.

一些调整选项可大量增加IO中的线程数,提交和连接池大小 fs.s3a.threads.max from 10 to something bigger fs.s3a.committer.threads -number files committed by a POST in parallel; default is 8 fs.s3a.connection.maximum + try (fs.s3a.committer.threads + fs.s3a.threads.max + 10)

Some tuning options to massively increase the number of threads in IO, commits and connection pool size fs.s3a.threads.max from 10 to something bigger fs.s3a.committer.threads -number files committed by a POST in parallel; default is 8 fs.s3a.connection.maximum + try (fs.s3a.committer.threads + fs.s3a.threads.max + 10)

这些都相当小,因为许多作业需要多个存储桶,并且如果每个存储桶都有大量存储,创建一个s3a客户端确实很昂贵...但是,如果您有成千上万个文件,这可能是值得的.

These are all fairly small as many jobs work with multiple buckets and if there were big numbers for each it'd be really expensive to create an s3a client...but if you have many thousands of files, probably worthwhile.

这篇关于Spark:将RDD结果写入文件系统的速度很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆