“容器因超出内存限制而被 YARN 杀死.已使用 10.4 GB 的 10.4 GB 物理内存"在具有 75GB 内存的 EMR 集群上 [英] "Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used" on an EMR cluster with 75GB of memory

查看:51
本文介绍了“容器因超出内存限制而被 YARN 杀死.已使用 10.4 GB 的 10.4 GB 物理内存"在具有 75GB 内存的 EMR 集群上的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 AWS EMR 上运行一个 5 节点 Spark 集群,每个集群的大小为 m3.xlarge(1 主 4 从).我成功地运行了一个 146Mb bzip2 压缩的 CSV 文件,最终得到了一个完美聚合的结果.

现在我正在尝试在此集群上处理 ~5GB bzip2 CSV 文件,但收到此错误:

<块引用>

16/11/23 17:29:53 WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executor 16 由于正在运行的任务之一退出)原因:容器因超出内存限制而被 YARN 杀死.使用了 10.4 GB 的 10.4 GB 物理内存.考虑提升 spark.yarn.executor.memoryOverhead.

我很困惑为什么我在 ~75GB 集群上获得 ~10.5GB 内存限制(每 3m.xlarge 实例 15GB)...

这是我的 EMR 配置:

<预><代码>[{"分类":"spark-env",特性":{},配置":[{"分类":"出口",特性":{"PYSPARK_PYTHON":"python34"},配置":[]}]},{"分类":"火花",特性":{最大化资源分配":真"},配置":[]}]

据我所知,设置 maximizeResourceAllocation 属性应该会告诉 EMR 配置 Spark 以充分利用集群上的所有可用资源.即,我应该有~75GB 的可用内存......那么为什么我会收到~10.5GB 内存限制错误?这是我正在运行的代码:

def sessionize(raw_data, timeout):# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.htmlwindow = (pyspark.sql.Window.partitionBy("user_id", "site_id").orderBy("时间戳"))diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1).over(窗口))time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff).withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))window = (pyspark.sql.Window.partitionBy("user_id", "site_id").orderBy("时间戳").rowsBetween(-1, 0))session = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))返回会话定义聚合会话(会话):中位数 = pyspark.sql.functions.udf(lambda x: statistics.median(x))聚合 = session.groupBy(pyspark.sql.functions.col("session_id")).agg(pyspark.sql.functions.first("site_id").alias("site_id"),pyspark.sql.functions.first("user_id").alias("user_id"),pyspark.sql.functions.count("id").alias("hits"),pyspark.sql.functions.min("timestamp").alias("start"),pyspark.sql.functions.max("timestamp").alias("finish"),中位数(pyspark.sql.functions.collect_list("foo")).alias("foo"),)总回报spark_context = pyspark.SparkContext(appName="process-raw-data")spark_session = pyspark.sql.SparkSession(spark_context)raw_data = spark_session.read.csv(sys.argv[1],标题=真,推断架构=真)# 窗口化似乎不能很好地与 TimestampTypes 配合使用.## 应该能够在 ``spark.read.csv`` 调用中做到这一点,我会# 思考.需要调查一下.convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)raw_data = raw_data.withColumn("时间戳",convert_to_unix(pyspark.sql.functions.col("timestamp")))会话 = sessionize(raw_data, SESSION_TIMEOUT)聚合 = 聚合会话(会话)聚合.foreach(save_session)

基本上,无非就是窗口化和一个 groupBy 来聚合数据.

它从其中的一些错误开始,并逐渐停止增加相同错误的数量.

我尝试使用 --conf spark.yarn.executor.memoryOverhead 运行 spark-submit ,但这似乎也不能解决问题.

解决方案

我感觉到你的痛苦..

我们在 YARN 上使用 Spark 时遇到了类似的内存不足问题.我们有 5 个 64GB、16 个核心的 VM,无论我们将 spark.yarn.executor.memoryOverhead 设置为什么,我们都无法为这些任务获得足够的内存——无论如何它们最终都会死我们会给他们很多记忆.这是导致这种情况发生的相对简单的 Spark 应用程序.

我们发现虚拟机上的物理内存使用率非常低,但虚拟内存使用率非常高(尽管日志抱怨物理内存).我们将 yarn-site.xml 中的 yarn.nodemanager.vmem-check-enabled 设置为 false 并且我们的容器不再被杀死,并且应用程序似乎按预期工作.

做了更多的研究,我在这里找到了为什么会发生这种情况的答案:http://web.archive.org/web/20190806000138/https://mapr.com/blog/best-practices-yarn-resource-management/

<块引用>

由于在 Centos/RHEL 6 上会因操作系统行为而大量分配虚拟内存,您应该禁用虚拟内存检查器或将 yarn.nodemanager.vmem-pmem-ratio 增加到相对较大的值.

那个页面有一个链接到一个非常有用的 IBM 页面:https://web.archive.org/web/20170703001345/https://www.ibm/munkev/developergorg条目/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

总之,glibc >2.10 改变了它的内存分配.尽管分配了大量虚拟内存并不是世界末日,但它不适用于 YARN 的默认设置.

除了将 yarn.nodemanager.vmem-check-enabled 设置为 false,您还可以设置 MALLOC_ARENA_MAXhadoop-env.sh 中将环境变量设置为一个较低的数字.此错误报告包含有关此的有用信息:https://issues.apache.org/jira/browse/HADOOP-7154

我建议通读这两页——信息非常方便.

I'm running a 5 node Spark cluster on AWS EMR each sized m3.xlarge (1 master 4 slaves). I successfully ran through a 146Mb bzip2 compressed CSV file and ended up with a perfectly aggregated result.

Now I'm trying to process a ~5GB bzip2 CSV file on this cluster but I'm receiving this error:

16/11/23 17:29:53 WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I'm confused as to why I'm getting a ~10.5GB memory limit on a ~75GB cluster (15GB per 3m.xlarge instance)...

Here is my EMR config:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

From what I've read, setting the maximizeResourceAllocation property should tell EMR to configure Spark to fully utilize all resources available on the cluster. Ie, I should have ~75GB of memory available... So why am I getting a ~10.5GB memory limit error? Here is the code I'm running:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

Basically, nothing more than windowing and a groupBy to aggregate the data.

It starts with a few of those errors, and towards halting increases in the amount of the same error.

I've tried running spark-submit with --conf spark.yarn.executor.memoryOverhead but that doesn't seem to solve the problem either.

解决方案

I feel your pain..

We had similar issues of running out of memory with Spark on YARN. We have five 64GB, 16 core VMs and regardless of what we set spark.yarn.executor.memoryOverhead to, we just couldn't get enough memory for these tasks -- they would eventually die no matter how much memory we would give them. And this as a relatively straight-forward Spark application that was causing this to happen.

We figured out that the physical memory usage was quite low on the VMs but the virtual memory usage was extremely high (despite the logs complaining about physical memory). We set yarn.nodemanager.vmem-check-enabled in yarn-site.xml to false and our containers were no longer killed, and the application appeared to work as expected.

Doing more research, I found the answer to why this happens here: http://web.archive.org/web/20190806000138/https://mapr.com/blog/best-practices-yarn-resource-management/

Since on Centos/RHEL 6 there are aggressive allocation of virtual memory due to OS behavior, you should disable virtual memory checker or increase yarn.nodemanager.vmem-pmem-ratio to a relatively larger value.

That page had a link to a very useful page from IBM: https://web.archive.org/web/20170703001345/https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

In summary, glibc > 2.10 changed its memory allocation. And although huge amounts of virtual memory being allocated isn't the end of the world, it doesn't work with the default settings of YARN.

Instead of setting yarn.nodemanager.vmem-check-enabled to false, you could also play with setting the MALLOC_ARENA_MAX environment variable to a low number in hadoop-env.sh. This bug report has helpful information about that: https://issues.apache.org/jira/browse/HADOOP-7154

I recommend reading through both pages -- the information is very handy.

这篇关于“容器因超出内存限制而被 YARN 杀死.已使用 10.4 GB 的 10.4 GB 物理内存"在具有 75GB 内存的 EMR 集群上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆