从Spark群集中收集数据时出现内存不足错误 [英] Out of memory error when collecting data out of Spark cluster

查看:163
本文介绍了从Spark群集中收集数据时出现内存不足错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道有很多关于Spark内存不足错误的问题,但我还没有找到解决方案.

I know there are plenty of questions on SO about out of memory errors on Spark but I haven't found a solution to mine.

我有一个简单的工作流程:

I have a simple workflow:

  1. 从Amazon S3读入ORC文件
  2. filter直到一小部分行
  3. select一小列列
  4. collect进入驱动程序节点(因此我可以在R中执行其他操作)
  1. read in ORC files from Amazon S3
  2. filter down to a small subset of rows
  3. select a small subset of columns
  4. collect into the driver node (so I can do additional operations in R)

当我运行上述命令,然后运行cache表以激发内存时,它占用的内存小于2GB-与群集可用的内存相比很小-然后,当我尝试collect数据时出现OOM错误到我的驱动程序节点.

When I run the above and then cache the table to spark memory it takes up <2GB - tiny compared to the memory available to my cluster - then I get an OOM error when I try to collect the data to my driver node.

我尝试在以下设置上运行:

I have tried running on the following setups:

    具有32核和244GB RAM的计算机上的
  • 本地模式
  • 具有10个6.2 GB执行器和61GB驱动程序节点的独立模式
  • local mode on a computer with 32 cores and 244GB of ram
  • standalone mode with 10 x 6.2 GB executors and a 61GB driver node

对于每一个,我都使用了executor.memorydriver.memorydriver.maxResultSize的众多配置以覆盖可用内存中所有可能值的范围,但始终会遇到内存不足错误在collect阶段;任何一个 java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError : GC overhead limit exceeded,或 Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.(指示内存问题的sparklyr错误).

For each of these I have played with numerous configurations of executor.memory, driver.memory, and driver.maxResultSize to cover the full range of possible values within my available memory, but always I end up with an out of memory error at the collect stage; either java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded, or Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned. (a sparklyr error indicative of memory issues).

基于我对Spark的[有限]理解,在收集之前对表进行缓存应强制执行所有计算-即,如果表在<2GB缓存后愉快地位于内存中,则我不需要多于2GB内存将其收集到驱动程序节点中.

Based on my [limited] understanding of Spark, caching a table prior to collecting should force all calculations - i.e. if the table is sitting happily in memory after caching at <2GB, then I shouldn't need much more than 2GB of memory to collect it into the driver node.

请注意,这个问题的答案有我还没有尝试的一些建议,但是这些建议可能会影响性能(例如,序列化RDD),因此请尽可能避免使用.

Note that answers to this question have some suggestions I am yet to try, but these are likely to impact performance (e.g. serialising the RDD) so would like to avoid using if possible.

我的问题:

  1. 在缓存后仅占用很小空间的数据帧怎么可能导致内存问题?
  2. 在继续使用可能会影响性能的其他选项之前,我是否有明显的检查/更改/故障排除方法来帮助解决问题?

谢谢

编辑:针对@Shaido在下面的评论,请注意,通过Sparklyr调用cache通过在表上执行count(*)来强制将数据加载到内存中" [摘自Sparklyr文档] -即该表应该位于内存中,并且所有计算(在我看来)都在调用collect之前运行.

note in response to @Shaido's comment below, calling cache via Sparklyr "forces data to be loaded in memory by executing a count(*) over the table" [from Sparklyr documentation] - i.e. the table should be sitting in memory and all the calculations run (I believe) prior to calling collect.

编辑:由于遵循以下建议,一些其他观察结果:

some additional observations since following the suggestions below:

  • 根据下面的评论,我现在尝试将数据写入csv而不是收集以了解可能的文件大小.此操作将创建一组约3GB的csvs,并且在缓存后运行仅需2秒.
  • 如果将driver.maxResultSize设置为< 1G,则会收到一条错误消息,指出序列化RDD的大小为1030 MB,大于driver.maxResultSize.
  • 如果在调用collect之后在任务管理器中查看内存使用情况,我看到使用情况一直持续上升,直到达到〜90GB,这时发生OOM错误. 因此无论出于何种原因,用于执行collect操作的RAM数量都比我要收集的RDD大小大100倍左右. . li>
  • As per the comments below, I have now tried writing the data to csv instead of collecting to get an idea of likely file size. This operation creates a set of csvs amounting to ~3GB, and takes only 2 seconds when run after caching.
  • If I set driver.maxResultSize to <1G I get an error stating that the size of the serialized RDD is 1030 MB, larger than driver.maxResultSize.
  • If I watch memory usage in Task Manager after calling collect I see that usage just keeps going up until it reaches ~ 90GB, at which point the OOM error occurs. So for whatever reason the amount of RAM being used to perform the collect operation is ~100x greater than the size of the RDD I'm trying to collect.

代码已根据注释的要求添加到下面.

code added below, as requested in comments.

#__________________________________________________________________________________________________________________________________

# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________

firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'

library(dplyr)
library(stringr)
library(sparklyr)

#__________________________________________________________________________________________________________________________________

# Configure & connect to spark
#__________________________________________________________________________________________________________________________________

Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 

config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions

# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')

#__________________________________________________________________________________________________________________________________

# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________

#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++

spark_session(sc) %>%
  invoke("read") %>% 
  invoke("format", "orc") %>%
  invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
  invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory

#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++

# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1

# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 

# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)

#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++

# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory

  # filter by month and year, using ORC partitions for extra speed
  filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
            (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
            (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
            (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%

  # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
  filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%

  # filter by advertiser ID
  filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
            !is.na(advertiser_id)) |
           ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
               floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%

  # Define cols to keep
  transmute(time=as.numeric(event_time/1000000),
            user_id=as.character(user_id),
            action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
            lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
            activity_lookup=as.character(activity_id),
            sv1=as.character(segment_value_1),
            other_data=as.character(other_data))  %>%
  mutate(time_char=as.character(from_unixtime(time)))

# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")

#__________________________________________________________________________________________________________________________________

# Collect out of spark
#__________________________________________________________________________________________________________________________________

myDF <- collect(dftbl)

推荐答案

在数据框上说收集"时,发生了两件事,

When you say collect on the dataframe there are 2 things happening,

  1. 首先,所有数据都必须写入驱动程序的输出中.
  2. 驱动程序必须从所有节点收集数据并保存在其内存中.

答案:

如果您希望将数据加载到执行器的内存中,则count()也是将数据加载到执行器的内存中的操作,其他进程可以使用该操作.

If you are looking to just load the data into memory of the exceutors, count() is also an action that will load the data into the executor's memory which can be used by other processes.

如果要提取数据,则在处理数据"--conf spark.driver.maxResultSize = 10g"时,请尝试将其与其他属性一起使用.

If you want to extract the data, then try this along with other properties when puling the data "--conf spark.driver.maxResultSize=10g".

这篇关于从Spark群集中收集数据时出现内存不足错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆