扩展性:我在做什么错? [英] spark scalability: what am I doing wrong?
问题描述
我正在使用spark处理数据,并且它可以处理一天的数据(40G),但对于一周的数据却不能使用 OOM 进行操作:
I am processing data with spark and it works with a day worth of data (40G) but fails with OOM on a week worth of data:
import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
.map(lambda row:(row.id, row.foo))
for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
.reduceByKey(operator.add).saveAsTextFile("myoutput")
不同ID的数量少于10k.
每个ID都是较小的int
.
作业失败,因为太多的执行程序因OOM而失败.
当作业成功时(使用少量输入),"myoutput"
约为100k.
The number of different IDs is less than 10k.
Each ID is a smallish int
.
The job fails because too many executors fail with OOM.
When the job succeeds (on small inputs), "myoutput"
is about 100k.
- 我在做什么错了?
- 我尝试用
collect
替换saveAsTextFile
(因为我实际上想在保存前在python中进行切片和切块),在行为上没有差异,同样失败.这是可以预期的吗? - 我以前使用的是
reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
而不是sc.union
-哪个更好?有什么区别吗?
- what am I doing wrong?
- I tried replacing
saveAsTextFile
withcollect
(because I actually want to do some slicing and dicing in python before saving), there was no difference in behavior, same failure. is this to be expected? - I used to have
reduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
instead ofsc.union
- which is better? Does it make any difference?
该群集具有 25 个节点,其中具有 825GB RAM和其中的 224 个核心.
The cluster has 25 nodes with 825GB RAM and 224 cores among them.
调用为spark-submit --master yarn --num-executors 50 --executor-memory 5G
.
单个RDD包含约140列,并且涵盖一小时的数据,因此一周是168(= 7 * 24)个RDD的并集.
A single RDD has ~140 columns and covers one hour of data, so a week is a union of 168(=7*24) RDDs.
推荐答案
在扩展时,Spark经常会遇到内存不足错误.在这种情况下,程序员应进行微调.或重新检查您的代码,以确保您没有做过多的事情,例如收集所有 bigdata ,无论您设置了多大的值,都有可能超过 memoryOverhead 限制.
Spark very often suffers from Out-Of-Memory errors when scaling. In these cases, fine tuning should be done by the programmer. Or recheck your code, to make sure that you don't do anything that is way too much, such as collecting all the bigdata in the driver, which is very likely to exceed the memoryOverhead limit, no matter how big you set it.
要了解正在发生的事情,您应该在yarn 时意识到a>决定杀死一个超出内存限制的容器.当容器超出 memoryOverhead 限制时,就会发生这种情况.
To understand what is happening you should realize when yarn decides to kill a container for exceeding memory limits. That will happen when the container goes beyond the memoryOverhead limit.
在计划程序"中,您可以检查事件时间线"以查看容器发生了什么.如果Yarn杀死了一个容器,它将显示为红色,当您将鼠标悬停/单击它时,您将看到类似以下的消息:
In the Scheduler you can check the Event Timeline to see what happened with the containers. If Yarn has killed a container, it will be appear red and when you hover/click over it, you will see a message like:
由于超出内存限制,被YARN杀死的容器.已使用16.9 GB的16 GB物理内存.考虑提高spark.yarn.executor.memoryOverhead.
Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
因此,在这种情况下,您要关注的是这些配置属性(值是 my 群集上的示例):
So in that case, what you want to focus on is these configuration properties (values are examples on my cluster):
# More executor memory overhead
spark.yarn.executor.memoryOverhead 4096
# More driver memory overhead
spark.yarn.driver.memoryOverhead 8192
# Max on my nodes
#spark.executor.cores 8
#spark.executor.memory 12G
# For the executors
spark.executor.cores 6
spark.executor.memory 8G
# For the driver
spark.driver.cores 6
spark.driver.memory 8G
要做的第一件事是增加memoryOverhead
.
The first thing to do is to increase the memoryOverhead
.
在驱动程序中还是在执行者中?
In the driver or in the executors?
从UI概述群集时,可以单击尝试ID"并检查诊断信息" ,其中应提及被杀死的容器的ID.如果与您的 AM容器相同,则为驱动程序,否则为执行程序.
When you are overviewing your cluster from the UI, you can click on the Attempt ID and check the Diagnostics Info which should mention the ID of the container that was killed. If it is the same as with your AM Container, then it's the driver, else the executor(s).
那不能解决问题,现在怎么办?
That didn't resolve the issue, now what?
您必须微调所提供的核心数量和堆内存.您会看到 pyspark 的问题将完成大部分工作-heap内存,因此您不希望为堆提供过多的空间,因为这将被浪费.您不想给得太少,因为垃圾收集器会遇到问题.回想一下这些是JVM.
You have to fine tune the number of cores and the heap memory you are providing. You see pyspark will do most of the work in off-heap memory, so you want not to give too much space for the heap, since that would be wasted. You don't want to give too less, because the Garbage Collector will have issues then. Recall that these are JVMs.
如这里,一个工作人员可以托管多个执行程序,因此使用的内核数会影响每个执行程序拥有的内存量,因此减少#cores可能会有所帮助.
As described here, a worker can host multiple executors, thus the number of cores used affects how much memory every executor has, so decreasing the #cores might help.
我将其写在 Spark中的内存开销问题中, Spark – 容器退出非零退出代码143 的更多细节,大部分我不会忘记!我没有尝试过的另一种选择是 spark.default根据我的经验,.parallelism 或/和spark.storage.memoryFraction
并没有帮助.
I have it written in memoryOverhead issue in Spark and Spark – Container exited with a non-zero exit code 143 in more detail, mostly that I won't forget! Another option, that I haven't tried would be spark.default.parallelism or/and spark.storage.memoryFraction
, which based on my experience, didn't help.
您可以像上面提到的sds那样传递配置标志,或者这样传递:
You can pass configurations flags as sds mentioned, or like this:
spark-submit --properties-file my_properties
其中"my_properties"类似于我上面列出的属性.
where "my_properties" is something like the attributes I list above.
对于非数值,您可以执行以下操作:
For non numerical values, you could do this:
spark-submit --conf spark.executor.memory='4G'
这篇关于扩展性:我在做什么错?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!