Spark优化-加入-任务数量很少-OOM [英] Spark optimization - joins - very low number of task - OOM

查看:60
本文介绍了Spark优化-加入-任务数量很少-OOM的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的spark应用程序失败,并出现以下错误:退出状态:143.诊断:容器在请求时被杀死.退出代码为143
这是我检查容器日志时得到的: java.lang.OutOfMemoryError:Java堆空间

我的应用程序主要是获得一个表,然后将我从AWS S3中读取的差异表连接起来:

  var结果= readParquet(table1)val table2 = readParquet(table2)结果= result.join(table2,result(primaryKey)=== table2(foreignKey))val table3 = readParquet(table3)结果= result.join(table3,result(primaryKey)=== table3(foreignKey))val table4 = readParquet(table4)结果= result.join(table4,result(primaryKey)=== table4(foreignKey)) 

以此类推

当我尝试使用以下方法将结果数据框保存到postgresql时,我的应用程序失败:

  result.toDF(df.columns.map(x => x.toLowerCase()):_ *).write.mode(覆盖").format("jdbc").option(JDBCOptions.JDBC_TABLE_NAME,表).保存() 

在失败的加入阶段,我的任务数量非常少:4个执行者有6个任务

为什么我的舞台舞台会产生2个工作?

第一个任务完成426个任务:

第二个失败:

我的spark-submit conf:

  dynamicAllocation = true核数= 2驱动程序内存= 6g执行器内存= 6g最大执行程序数= 10最小执行程序数= 1spark.default.parallelism = 400spark.sql.shuffle.partitions = 400 

我尝试了更多的资源,但是存在相同的问题:

  num core = 5驱动程序内存= 16克执行程序内存= 16克num executor = 20 

我认为,即使默认数量为400分区,所有数据也会进入同一分区/执行器,这会导致OOM错误

我尝试了(但没有成功):查看数据
broadcastJoin,但是我的桌子还不够小,无法在最后播放它.
重新分配给更高的数字(4000),并在每次连接之间进行一次计数以执行操作:

我的主表接缝增长非常快:
(行数)40->68->7304->946832->123032864->246064864->(过了太多时间)
但是数据大小接缝很低

如果我查看任务指标,有趣的是我的数据接缝歪斜了(我真的不确定)
在上一次计数操作中,我可以看到〜120个任务执行操作,其中100MB记录和12秒的输入数据约为10MB,而其他3880个任务则完全没有执行任何操作(3ms,0个记录16B(元数据?)):

解决方案

驱动程序内存= 16g内存太大,不需要.仅当您有大量数据要通过(collect())之类的操作来掌握时,请确保在这种情况下确保增加spark.maxResult.size

您可以执行以下操作

-在读取文件readParquet(table1).repartition(x)的同时进行分区.如果其中一个表较小,则可以广播该表并删除联接,而使用mapPartition并使用广播变量作为查找缓存.

(OR)

- 选择一个均匀分布的列,并使用该特定列对表进行相应的分区.

通过查看以上统计数据,我需要强调两点.您的工作具有较高的计划延迟,这是由于任务太多而导致的,并且您的任务统计信息很少以输入数据为10字节启动统计信息,而启动数据为9MB的统计信息很少....显然,这里存在数据偏斜...第一个完成了426个任务,但分区数量为4000,因此应该启动更多任务

请查看 https://towardsdatascience.com/the-火花中的接合艺术dcbd33d693c ...以获取更多见解.

My spark application fail with this error : Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
This is what i get when I inspect the containger log : java.lang.OutOfMemoryError: Java heap space

My application is mainly get a table then join differents tables that i read from aws S3:

var result = readParquet(table1)  
val table2 = readParquet(table2)

result = result.join(table2 , result(primaryKey) === table2(foreignKey))

val table3 = readParquet(table3)

result = result.join(table3 , result(primaryKey) === table3(foreignKey))

val table4 = readParquet(table4)

result = result.join(table4 , result(primaryKey) === table4(foreignKey))

and so on

My application fail when i try to save my result dataframe to postgresql using :

result.toDF(df.columns.map(x => x.toLowerCase()): _*).write
  .mode("overwrite")
  .format("jdbc")
  .option(JDBCOptions.JDBC_TABLE_NAME, table)
  .save()

On my failed join Stage i have a very low number of task : 6 tasks for 4 executors

Why my Stage stage generate 2 jobs ?

The first one is completed with 426 task :

and the second one is failing :

My spark-submit conf :

dynamicAllocation = true  
num core = 2
driver memory = 6g
executor memory = 6g
max num executor = 10
min num executor = 1
spark.default.parallelism = 400
spark.sql.shuffle.partitions = 400

I tried with more resources but same problem :

 num core = 5
 driver memory = 16g
 executor memory = 16g
 num executor = 20

I think that all the data go to same partition/executor even with a default number of 400 partition and this cause a OOM error

I tried (without success) : persit data
broadcastJoin, but my table is not small enough to broadcast it at the end.
repartition to higher number (4000) an do a count between each join to perform a action :

my main table seam to growth very fast :
(number of rows ) 40 -> 68 -> 7304 -> 946 832 -> 123 032 864 -> 246 064 864 -> (too much time after )
However the data size seam very low

If i look at task metrics a interesting thing is that my data seam skewed ( i am realy not sure )
In the last count action, i can see that ~120 task perform action , with ~10MB of input data for 100 Records and 12 seconds and the other 3880 tasks do absolutly nothings ( 3ms , 0 records 16B ( metadata ? ) ):

解决方案

driver memory = 16g is too high memory and not needed. use only when you have a huge collection of data to master by actions like (collect() ) make sure to increase spark.maxResult.size if that is the case

you can do the following things

-- Do repartition while reading files readParquet(table1).repartition(x).if one of the tables is small then you can broadcast that and remove join instead use mapPartition and use a broadcast variable as lookup cache.

(OR)

-- Select a column that is uniformly distributed and repartition your table accordingly using that particular column.

Two points I need to press by looking in the above stats. your job has high scheduling delay which is caused by too many tasks and your task stats few stats are launched with input data as 10 bytes and few launched with 9MB.... obviously, there is data skewness here ... as you said The first one is completed with 426 tasks but with 4000 as repartition count it should launch more tasks

please look at https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c ... for more insights.

这篇关于Spark优化-加入-任务数量很少-OOM的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆