按键分组时,Spark 内存不足 [英] Spark runs out of memory when grouping by key

查看:17
本文介绍了按键分组时,Spark 内存不足的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 本指南,我的代码是这样的:

I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like this:

package ccminer

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object ccminer {
  val english = "english|en|eng"
  val spanish = "es|esp|spa|spanish|espanol"
  val turkish = "turkish|tr|tur|turc"
  val greek = "greek|el|ell"
  val italian = "italian|it|ita|italien"
  val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")

  def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")

  def main(args: Array[String]): Unit = {
    if (args.length != 3) {
      System.err.println("Bad command line")
      System.exit(-1)
    }

    val cluster = "spark://???"
    val sc = new SparkContext(cluster, "Common Crawl Miner",
      System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))

    sc.sequenceFile[String, String](args(0)).map {
      case (k, v) => (langIndep(k), v)
    }
    .groupByKey(args(2).toInt)
    .filter {
      case (_, vs) => vs.size > 1
    }
    .saveAsTextFile(args(1))
  }
}

我使用以下命令运行它:

And I am running it with the command as follows:

sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"

但很快它就失败了,错误如下

But very quickly it fails with errors as follows

java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

所以我的基本问题是,编写一个可以按几乎无限量输入进行分组且不会耗尽内存的 Spark 任务需要什么?

So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?

推荐答案

shuffle任务(如groupByKey、reduceByKey等)中java.lang.OutOfMemoryError异常最常见的原因是并行化.

The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of parallelism.

您可以通过在配置中设置spark.default.parallelism属性来增加默认值.

You can increase default value by setting spark.default.parallelism property in configuration.

这篇关于按键分组时,Spark 内存不足的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆