按键分组时,Spark 内存不足 [英] Spark runs out of memory when grouping by key
问题描述
我正在尝试使用 本指南,我的代码是这样的:
I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like this:
package ccminer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ccminer {
val english = "english|en|eng"
val spanish = "es|esp|spa|spanish|espanol"
val turkish = "turkish|tr|tur|turc"
val greek = "greek|el|ell"
val italian = "italian|it|ita|italien"
val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|")
def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*")
def main(args: Array[String]): Unit = {
if (args.length != 3) {
System.err.println("Bad command line")
System.exit(-1)
}
val cluster = "spark://???"
val sc = new SparkContext(cluster, "Common Crawl Miner",
System.getenv("SPARK_HOME"), Seq("/root/spark/ccminer/target/scala-2.10/cc-miner_2.10-1.0.jar"))
sc.sequenceFile[String, String](args(0)).map {
case (k, v) => (langIndep(k), v)
}
.groupByKey(args(2).toInt)
.filter {
case (_, vs) => vs.size > 1
}
.saveAsTextFile(args(1))
}
}
我使用以下命令运行它:
And I am running it with the command as follows:
sbt/sbt "run-main ccminer.ccminer s3n://aws-publicdatasets/common-crawl/parse-output/segment/1341690165636/textData-* s3n://parallelcorpus/out/ 2000"
但很快它就失败了,错误如下
But very quickly it fails with errors as follows
java.lang.OutOfMemoryError: Java heap space
at com.ning.compress.BufferRecycler.allocEncodingBuffer(BufferRecycler.java:59)
at com.ning.compress.lzf.ChunkEncoder.<init>(ChunkEncoder.java:93)
at com.ning.compress.lzf.impl.UnsafeChunkEncoder.<init>(UnsafeChunkEncoder.java:40)
at com.ning.compress.lzf.impl.UnsafeChunkEncoderLE.<init>(UnsafeChunkEncoderLE.java:13)
at com.ning.compress.lzf.impl.UnsafeChunkEncoders.createEncoder(UnsafeChunkEncoders.java:31)
at com.ning.compress.lzf.util.ChunkEncoderFactory.optimalInstance(ChunkEncoderFactory.java:44)
at com.ning.compress.lzf.LZFOutputStream.<init>(LZFOutputStream.java:61)
at org.apache.spark.io.LZFCompressionCodec.compressedOutputStream(CompressionCodec.scala:60)
at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:803)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.BlockManager$$anonfun$5.apply(BlockManager.scala:471)
at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117)
at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
所以我的基本问题是,编写一个可以按几乎无限量输入进行分组且不会耗尽内存的 Spark 任务需要什么?
So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?
推荐答案
shuffle任务(如groupByKey、reduceByKey等)中java.lang.OutOfMemoryError异常最常见的原因是并行化.
The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of parallelism.
您可以通过在配置中设置spark.default.parallelism属性来增加默认值.
You can increase default value by setting spark.default.parallelism property in configuration.
这篇关于按键分组时,Spark 内存不足的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!