Sparklyr的spark_apply函数似乎在单个执行程序上运行,并且在中等大小的数据集上失败 [英] Sparklyr's spark_apply function seems to run on single executor and fails on moderately-large dataset

查看:123
本文介绍了Sparklyr的spark_apply函数似乎在单个执行程序上运行,并且在中等大小的数据集上失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用spark_apply在Spark表上运行以下R函数.如果我的输入表很小(例如5,000行),这很好用,但是当表中等大小(例如5,000,000行)时,大约30分钟后会引发错误: sparklyr worker rscript failure, check worker logs for details

I am trying to use spark_apply to run the R function below on a Spark table. This works fine if my input table is small (e.g. 5,000 rows), but after ~30 mins throws an error when the table is moderately large (e.g. 5,000,000 rows): sparklyr worker rscript failure, check worker logs for details

查看Spark UI会发现,仅创建了一个任务,并且将单个执行程序应用于此任务.

Looking at the Spark UI shows that there is only ever a single task being created, and a single executor being applied to this task.

任何人都可以针对500万行数据集的功能为何失败提出建议吗?问题是否可能是由一个执行程序来完成所有工作而失败了?

Can anyone give advice on why this function is failing for 5 million row dataset? Could the problem be that a single executor is being made to do all the work, and failing?

# Create data and copy to Spark
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table
                 string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000))
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE)

# Write function to return dataframe with strings split out
myFunction <- function(inputdf){
  inputdf$string_categories <- as.character(inputdf$string_categories)
  inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories))
  stringCategoriesList <- strsplit(inputdf$string_categories, ' ')
  outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))),
                  string_categories=unlist(stringCategoriesList))
 return(outDF)
}

# Use spark_apply to run function in Spark
outtbl <- testtbl %>%
  spark_apply(myFunction,
          names=c('string_id', 'string_categories'))
outtbl

推荐答案

  1. sparklyr worker rscript failure, check worker logs for details错误由驱动程序节点写入,并指出需要在工作日志中找到实际的错误.通常,可以通过在Spark UI的执行者"选项卡中打开stdout来访问工作日志.日志中应包含RScript:条目,描述执行程序正在处理的内容以及错误的具体情况.

  1. The sparklyr worker rscript failure, check worker logs for details error is written by the driver node and points out that the actual error needs to be found in the worker logs. Usually, the worker logs can be accessed by opening stdout from the executor's tab in the Spark UI; the logs should contain RScript: entries describing what the executor is processing and the specific of the error.

关于正在创建的单个任务,当未使用spark_apply()中的类型指定columns时,它需要计算结果的子集来猜测列类型,为避免这种情况,请传递显式列类型如下:

Regarding the single task being created, when columns are not specified with types in spark_apply(), it needs to compute a subset of the result to guess the column types, to avoid this, pass explicit column types as follows:

outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

如果使用sparklyr 0.6.3,请更新到sparklyr 0.6.4devtools::install_github("rstudio/sparklyr"),因为在某些启用包分发且每个节点运行多个执行程序的边缘情况下,sparklyr 0.6.3包含不正确的等待时间.

If using sparklyr 0.6.3, update to sparklyr 0.6.4 or devtools::install_github("rstudio/sparklyr"), since sparklyr 0.6.3 contains an incorrect wait time in some edge cases where package distribution is enabled and more than one executor runs in each node.

在高负载下,通常会耗尽内存.增加分区数可以解决此问题,因为它将减少处理该数据集所需的总内存.尝试将其运行为:

Under high load, it is common to run out of memory. Increasing the number of partitions could resolve this issue since it would reduce the total memory required to process this dataset. Try running this as:

testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

也可能是由于函数中的逻辑,该函数对某些分区抛出异常,您可以通过使用tryCatch()忽略错误然后查找来查看是否是这种情况这是缺少的值,以及为什么函数对于这些值将失败.我将从以下内容开始:

It could also be the case that the function throws an exception for some of the partitions due to logic in the function, you could see if this is the case by using tryCatch() to ignore the errors and then find which are the missing values and why the function would fail for those values. I would start with something like:

myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }

myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }

这篇关于Sparklyr的spark_apply函数似乎在单个执行程序上运行,并且在中等大小的数据集上失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆