在内存中引发多次迭代 [英] spark out of memory multiple iterations

查看:66
本文介绍了在内存中引发多次迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个spark作业(在spark 1.3.1中运行)必须迭代几个键(大约42个)并处理该作业.这是程序的结构

I have a spark job that (runs in spark 1.3.1) has to iterate over several keys (about 42) and process the job. Here is the structure of the program

  1. 从地图上获取钥匙
  2. 从配置单元(在其下方的hadoop-yarn)中获取与密钥匹配的数据作为数据帧
  3. 过程数据
  4. 将结果写为蜂巢

当我一键运行此命令时,一切正常.当我使用42个键运行时,在第12次迭代时遇到内存不足异常.有没有办法可以在每次迭代之间清理内存?帮助表示赞赏.

When I run this for one key, everything works fine. When I run with 42 keys, I am getting an out of memory exception around 12th iteration. Is there a way I can clean the memory in between each iteration? Help appreciated.

这是我正在使用的高级代码.

Here is the high level code that I am working with.

public abstract class SparkRunnable {

public static SparkContext sc = null;
public static JavaSparkContext jsc = null;
public static HiveContext hiveContext = null;
public static SQLContext sqlContext = null;

protected SparkRunnableModel(String appName){
    //get the system properties to setup the model
    // Getting a java spark context object by using the constants
    SparkConf conf = new SparkConf().setAppName(appName);
    sc = new SparkContext(conf);
    jsc = new JavaSparkContext(sc);

    // Creating a hive context object connection by using java spark
    hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);

    // sql context
    sqlContext = new SQLContext(sc);

}

public abstract void processModel(Properties properties) throws Exception;

}

class ModelRunnerMain(model: String) extends SparkRunnableModel(model: String) with Serializable {

  override def processModel(properties: Properties) = {
  val dataLoader = DataLoader.getDataLoader(properties)

//loads keys data frame from a keys table in hive and converts that to a list
val keysList = dataLoader.loadSeriesData()

for (key <- keysList) {
    runModelForKey(key, dataLoader)
}
}

  def runModelForKey(key: String, dataLoader: DataLoader) = {

//loads data frame from a table(~50 col X 800 rows) using "select * from table where key='<key>'"
val keyDataFrame = dataLoader.loadKeyData()

// filter this data frame into two data frames
...

// join them to transpose
...

// convert the data frame into an RDD
...

// run map on the RDD to add bunch of new columns
...
  }

}

我的数据帧大小在Meg以下.但是我通过选择和加入等方法从中创建了几个数据帧.我假设一旦迭代完成,所有这些数据都会被垃圾收集.

My data frame size is under a Meg. But I create several data frames from this by selecting and joining etc. I assume all these get garbage collected once the iteration is done.

这是我正在运行的配置.

Here is configuration I am running with.

  • spark.eventLog.enabled:true spark.broadcast.port:7086
  • spark.driver.memory:12g spark.shuffle.spill:false
  • spark.serializer:org.apache.spark.serializer.KryoSerializer
  • spark.storage.memoryFraction:0.7 spark.executor.cores:8
  • spark.io.compression.codec:lzf spark.shuffle.consolidateFiles:true
  • spark.shuffle.service.enabled:true spark.master:yarn-client
  • spark.executor.instances:8 spark.shuffle.service.port:7337
  • spark.rdd.compress:true spark.executor.memory:48g
  • spark.executor.id:spark.sql.shuffle.partitions:700
  • spark.cores.max:56
  • spark.eventLog.enabled:true spark.broadcast.port:7086
  • spark.driver.memory:12g spark.shuffle.spill:false
  • spark.serializer:org.apache.spark.serializer.KryoSerializer
  • spark.storage.memoryFraction:0.7 spark.executor.cores:8
  • spark.io.compression.codec:lzf spark.shuffle.consolidateFiles:true
  • spark.shuffle.service.enabled:true spark.master:yarn-client
  • spark.executor.instances:8 spark.shuffle.service.port:7337
  • spark.rdd.compress:true spark.executor.memory:48g
  • spark.executor.id: spark.sql.shuffle.partitions:700
  • spark.cores.max:56

这是我得到的例外.

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66)
at org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55)
at com.ning.compress.lzf.ChunkEncoder.encodeAndWriteChunk(ChunkEncoder.java:264)
at com.ning.compress.lzf.LZFOutputStream.writeCompressedBlock(LZFOutputStream.java:266)
at com.ning.compress.lzf.LZFOutputStream.write(LZFOutputStream.java:124)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

推荐答案

使用checkpoint()或localCheckpoint()可以减少火花沿袭,并提高应用程序的迭代性能.

Using checkpoint() or localCheckpoint() can cut the spark lineage and improve the performance of the application in iterations.

这篇关于在内存中引发多次迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆