R并行-parRapply无法正常工作 [英] R parallel - parRapply not working properly
问题描述
我正在对开发中的软件包进行一些单元测试.其中一项测试失败.具体来说,我有代码的并行版本和非并行版本.非并行版本可完美运行.并行版本未通过单元测试,但似乎没有道理的错误.
I'm doing some unit testing on a package in development. One of the tests is failing. Specifically, I have parallel versions of the code and non-parallel versions. The non-parallel version works perfectly. The parallel version fails a unit test, with a seemingly nonsensical error.
## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")
## do some setup:
library(imputation)
library(kernlab)
library(parallel)
x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)
opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)
## write 2 identical functions:
## one in parallel
## one not in parallel
foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
cl <- makeCluster(detectCores() - leave_cores)
x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete)
stopCluster(cl)
return(x_missing_imputed)
}
foo_nonparallel <- function(x_missing, x_complete, k, q) {
x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete))
return(x_missing_imputed)
}
## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works
checkForRemoteErrors(val)中的错误: 2个节点产生错误;第一个错误:ref必须为{1,nrow(x)}中的整数.
Error in checkForRemoteErrors(val) : 2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.
如您所见,ref
明确定义为ref= 1L
,位于1,nrow(x)中.
As you can see, ref
is clearly defined as ref= 1L
which is in 1, nrow(x).
与library(parallel)
的交互导致此错误的原因是什么?
What is going on with the interaction with library(parallel)
that is causing this error?
编辑-我在Windows计算机上:
Edit - I'm on a windows machine:
R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1
推荐答案
我已经弄清楚是什么原因引起的.在我看来,这是library(parallel)
错误/边缘情况,特定于apply函数的并行化版本(在本例中为parRapply
).也许年龄较大,更明智的人可以解释为什么library(parallel)
中没有遇到这种情况.
I have figured out what is causing the problem. This seems to me to be a library(parallel)
bug / edge-case, specific to the parallelized versions of the apply functions (in this case parRapply
). Perhaps someone older and wiser can explain why there isn't a catch in library(parallel)
for this edge case.
问题似乎与任务数量与可用工作人员数量有关.在我的机器上,我有一个8核处理器.在这种情况下,有5个任务(prelim$x_missing
的每一行一个).
The issue seems to be related to the number of tasks vs the number of workers available. On my machine, I have an 8-core processor. In this case, there are 5 tasks (one for each row of prelim$x_missing
).
在通常的使用中,我不会并行处理5行的工作.这只是一个单元测试.
Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.
R> prelim$x_missing
X1 X2 X3 X4 X5 X6 X7 X8 X9 X10 d_factor
6 6 0.2604170 -0.5966874 NA NA -0.3013053 0.24313272 0.2836760 0.3977164 -0.60711109 -0.2929253 1
7 7 -0.8540576 0.1409047 NA 0.4801685 -0.9324517 -0.06487733 -0.2220201 NA 1.19077335 -0.3702607 2
8 8 0.5118453 -0.8750674 NA 0.1787238 0.6897163 0.20695122 NA -0.3488021 0.84200408 -0.4791230 1
12 12 0.3695746 -0.4919277 -1.2509180 1.1642152 NA 0.04018417 NA NA -0.53436589 -1.5400345 2
15 15 NA -0.3608242 -0.6761515 -0.5366562 0.1763501 NA NA 0.4967595 0.02635203 -0.6015536 1
请注意,我是通过cl <- parallel::makeCluster(detectCores() - leave_cores)
建立集群的,其中detectCores()将为当前计算机返回8.函数调用接受一个参数,以使内核数保持打开状态leave_cores
.当我创建一个用例中的行数比行数更多的内核/节点的集群时,该函数将失败.当我用< =行数创建集群时,该函数起作用:
Note that I am making the cluster via cl <- parallel::makeCluster(detectCores() - leave_cores)
where detectCores() will return 8 for my current machine. The function call accepts a parameter for the number of cores to leave open leave_cores
. When I make a cluster with more cores/nodes than rows in the use-case, the function fails. When I make a cluster with <= number of rows, the function works:
# works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
[1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624
# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2)
Error in checkForRemoteErrors(val) :
one node produced an error: ref must be an integer in {1, nrow(x)}.
tl,dr
如 rparallel小插图,detectCores
用于简单地检测内核,它非常合理地不会尝试将任何智能任务分配给工作人员.
tl,dr
As described in the rparallel vignette, detectCores
is used to simply detect the cores, it very reasonably does not attempt to do any intelligent assignment of tasks to workers.
function detectCores()函数试图确定运行R的计算机中的CPU内核数:它有办法在所有已知的当前R上执行此操作 平台.它确切衡量的是特定于OS的:我们尝试在可能的情况下报告可用物理内核的数量.在Windows上,默认设置是报告逻辑CPU的数量.在现代硬件(例如Intel Core i7)上,后者可能并非不合理,因为超线程确实提供了显着优势. 额外的吞吐量.
function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant extra throughput.
我正在调用函数parallel::parRapply
进行计算. parRapply
通过splitRows
功能将工作分派给工人.但是splitRows
函数似乎没有任何智能或错误捕捉功能.
I am calling the function parallel::parRapply
to do the computation. parRapply
dispatches the work to the workers via the splitRows
function. But there doesn't seem to be any intelligence or error-catching to the splitRows
function.
R> parRapply
function (cl = NULL, x, FUN, ...)
{
cl <- defaultCluster(cl)
do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)),
fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>
我找不到splitRows
的源代码,但parallel::splitIndices
似乎很相似:
I can't find the source code for splitRows
but parallel::splitIndices
seems similar:
R> parallel:::splitIndices
function (nx, ncl)
{
i <- seq_len(nx)
if (ncl == 0L)
list()
else if (ncl == 1L || nx == 1L)
list(i)
else {
fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
}
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>
在我的单元测试中,将按以下步骤执行:
In my unit test, this would execute as the following:
# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1
[[2]]
integer(0)
[[3]]
[1] 2
[[4]]
integer(0)
[[5]]
[1] 3
[[6]]
[1] 4
[[7]]
integer(0)
[[8]]
[1] 5
在3个整数(0)的位置,这会导致调用栈中的失败.
Where there are 3 integer(0)s, which cause failure further down the call stack.
# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2
[[2]]
[1] 3
[[3]]
[1] 4 5
如果任何人都可以在下面的注释中提供指向splitRows
的源代码的链接,我将很乐意更新此答案. parallel::clusterApply
和parallel:::staticClusterApply
的代码很容易找到
If anyone can provide a link in the comments below to the source code for splitRows
, I'll happily update this answer. Code for parallel::clusterApply
and parallel:::staticClusterApply
are easily found
这篇关于R并行-parRapply无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!