在多集群并行方法中启动时的可变作用域 [英] Variable scope in boot in a multiclustered parallel approach

查看:78
本文介绍了在多集群并行方法中启动时的可变作用域的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图弄清楚在运行并行计算时如何将函数和包传递给boot()函数.在循环中加载程序包或定义函数似乎非常昂贵.我经常用于其他并行任务的foreach()函数具有处理此问题的.packages和.export参数(请参阅此

抱怨(如预期的那样)找不到myMean.

边注::运行此示例时,它的运行速度比单核慢,这可能是因为将此简单任务拆分为多个核比实际任务要耗费更多时间.为什么默认设置不拆分为R/ncpus的偶数批作业-为什么这不是默认行为?

边注中的更新:正如Steve Weston所指出的那样,boot()使用的parLapply实际上将作业分成了偶数批/块.该函数是clusterApply的简洁包装器:

docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, 
    fun, ...))

当我增加重复次数时,这并没有更好的表现,我感到有些惊讶:

> library(boot)
> set.seed(10)
> x <- runif(1000)
> 
> Reps <- 10^4
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 0.52335 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 3.539357 secs
> 
> Reps <- 10^5
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 5.749831 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 23.06837 secs

我希望这仅是由于非常简单的均值函数造成的,并且更复杂的情况表现得更好.我必须承认,由于集群初始化时间应该与10.000& amp;中的相同,因此我感到有些不安. 100.000种情况,但绝对时差会增加,而4核版本的时间要长5倍.我想这一定是列表合并的结果,因为我找不到其他解释.

解决方案

如果要并行执行的函数(在本例中为meaninglessTest)具有额外的依赖项(例如myMean),则标准解决方案是导出通过clusterExport函数对集群的依赖关系.这需要创建一个群集对象,并通过"cl"参数将其传递给boot:

library(boot)
library(parallel)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}
cl <- makePSOCKcluster(4)
clusterExport(cl, 'myMean')

x <- runif(1000)

bootTest <- function() {
  out <- boot(data=x, statistic=meaninglessTest, R=10000,
              parallel="snow", ncpus=4, cl=cl)
  return(boot.ci(out, type="perc"))
}

bootTest()
stopCluster(cl)

请注意,一旦初始化集群工作程序,它们便可以被boot多次使用,而无需重新初始化,因此并不昂贵.

要在群集工作程序上加载程序包,可以使用clusterEvalQ:

clusterEvalQ(cl, library(randomForest))

那很好,很简单,但是对于更复杂的工作程序初始化,我通常创建一个工作程序初始化"函数,并通过clusterCall执行它,这对于在每个工作程序上执行一次功能是完美的.

关于您的旁注,性能很差,因为如您所说,统计功能所做的工作很少,但是我不确定为什么您认为工作没有在工人之间平均分配.在这种情况下,使用parLapply函数并行执行工作,并且确实均匀且高效地拆分了工作,但是与使用lapply顺序运行相比,这不能保证更好的性能.但是也许我误会了你的问题.

I'm trying to figure out how to pass functions and packages to the boot() function when running parallel computations. It seems very expensive to load a package or define functions inside a loop. The foreach() function that I often use for other parallel tasks has a .packages and .export arguments that handles this (see this SO question) in a nice way but I can't figure out how to do this with the boot package.

Below is a meaningless example that shows what happens when switching to parallel:

library(boot)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}

x <- runif(1000)

bootTest <- function(){
  out <- boot(data=x, statistic=meaninglessTest, R=10000, parallel="snow", ncpus=4)
  return(boot.ci(out, type="perc"))
}

bootTest()

Complains (as expected) about that it can't find myMean.

Sidenote: When running this example it runs slower than one-core, probably because splitting this simple task over multiple cores is more time consuming than the actual task. Why isn't the default to split into even job batches of R/ncpus - is there a reason why this isn't default behavior?

Update on the sidenote: As Steve Weston noted, the parLapply that boot() uses actually splits the job into even batches/chunks. The function is a neat wrapper for clusterApply:

docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, 
    fun, ...))

I'm a little surprised that this doesn't have a better performance when I scale up the the number of repetitions:

> library(boot)
> set.seed(10)
> x <- runif(1000)
> 
> Reps <- 10^4
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 0.52335 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 3.539357 secs
> 
> Reps <- 10^5
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 5.749831 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 23.06837 secs

I hope that this is only due to the very simple mean function and that more complex cases behave better. I must admit that I find it a little disturbing as the cluster initialization time should be the same in the 10.000 & 100.000 case, yet the absolute time difference increases and the 4-core version takes 5 times longer. I guess this must be an effect of the list merging, as I can't find any other explanation for it.

解决方案

If the function to be executed in parallel (meaninglessTest in this case) has extra dependencies (such as myMean), the standard solution is to export those dependencies to the cluster via the clusterExport function. That requires creating a cluster object and passing it to boot via the "cl" argument:

library(boot)
library(parallel)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}
cl <- makePSOCKcluster(4)
clusterExport(cl, 'myMean')

x <- runif(1000)

bootTest <- function() {
  out <- boot(data=x, statistic=meaninglessTest, R=10000,
              parallel="snow", ncpus=4, cl=cl)
  return(boot.ci(out, type="perc"))
}

bootTest()
stopCluster(cl)

Note that once the cluster workers have been initialized, they can be used by boot many times and do not need to be reinitialized, so it isn't that expensive.

To load packages on the cluster workers, you can use clusterEvalQ:

clusterEvalQ(cl, library(randomForest))

That's nice and simple, but for more complex worker initialization, I usually create a "worker init" function and execute it via clusterCall which is perfect for executing a function once on each of the workers.

As for your side note, the performance is bad because the statistic function does so little work, as you say, but I'm not sure why you think that the work isn't being split evenly between the workers. The parLapply function is used to do the work in parallel in this case, and it does split the work evenly and rather efficiently, but that doesn't guarantee better performance than running sequentially using lapply. But perhaps I'm misunderstanding your question.

这篇关于在多集群并行方法中启动时的可变作用域的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆