如何解决该异常:java.lang.OutOfMemoryError:即使在spark-submit中提供了足够的内存,也超出了GC开销限制? [英] How to fix the exception: java.lang.OutOfMemoryError: GC overhead limit exceeded even though enough memory is given in the spark-submit?

查看:77
本文介绍了如何解决该异常:java.lang.OutOfMemoryError:即使在spark-submit中提供了足够的内存,也超出了GC开销限制?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图以以下方式读取Postgres上的表并将数据帧插入HDFS上的Hive表中:

I am trying to read a table on Postgres and insert the dataframe into a Hive table on HDFS in the below manner:

def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
  val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_forecast where period_year='2017'"
  val yearDF    = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017").option("user", devUserName).option("password", devPassword).option("numPartitions",20).load()
  val totalCols:List[String] = splitColumns ++ textList
  val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
  hiveDataTypes              = cdt.gpDetails()
  prepareHiveTableSchema(hiveDataTypes, partition_columns)
  val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
  val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
  val resultDF               = yearDF.select(allCols:_*)
  val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
  val finalDF                = stringColumns.foldLeft(resultDF) {
    (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
  }
  finalDF
}

    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    dataDF.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE default.xx_gl_forecast PARTITION(${prtn_String_columns}) select * from preparedDF")

我正在使用的spark-submit命令:

The spark-submit command I am using:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/username/jars/postgresql-42.1.4.jar  --jars /home/username/jars/postgresql-42.1.4.jar --num-executors 40 --executor-cores 10 --executor-memory 30g --driver-memory 20g --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/username/usr.keytab --principal usr@DEV.COM --files /username/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/username/jars/postgresql-42.1.4.jar

我有以下资源:

number of cores:51
max container memory:471040 MB
Number of executors per LLAP Daemon:39 

即使我将内存加倍,我仍然在日志中记录以下异常:

Even though the I double the memory, I still these exceptions in the log:

Container exited with a non-zero exit code 143.
Killed by external signal
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.toCharArray(String.java:2899)
at java.util.zip.ZipCoder.getBytes(ZipCoder.java:78)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
18/09/23 04:57:20 INFO JDBCRDD: closed connection

代码中是否有任何错误导致程序崩溃?谁能让我知道我在这里做的错误是什么,以便我可以解决.

Is there anything wrong in the code that makes the program crash ? Could anyone let me know what is the mistake I am doing here so that I can fix it.

推荐答案

此异常告诉您,您正在花费大量时间进行垃圾收集.您应该做的第一件事是在作业运行时(或在历史服务器中)检查Spark UI,以查看哪个阶段进行了很多GC.您应该可以从用户界面中非常明显地看到它.

This exception is telling you that you're spending a large amount of time garbage collecting. The first thing you should do is check the Spark UI while the job is running (or in he history server) to see which stage(s) are GCing a lot. You should be able to see it very obviously from the UI.

我的猜测是它将被洗牌.现在的问题是:

My guess is that it's going to be a shuffle. Now the questions are:

  • 考虑到数据的大小,您是否有足够的分区?
  • 如果没有,请尝试使用 spark.sql.shuffle.partitions
  • 来提高随机播放的默认并行度
  • 如果它们已经足够大了,是什么导致您的堆被填满?您可能想在作业运行时执行堆转储,然后使用转储分析工具对其进行探索.

这篇关于如何解决该异常:java.lang.OutOfMemoryError:即使在spark-submit中提供了足够的内存,也超出了GC开销限制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆