R并行-parRapply无法正常工作 [英] R parallel - parRapply not working properly

查看:143
本文介绍了R并行-parRapply无法正常工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在对开发中的软件包进行一些单元测试.其中一项测试失败.具体来说,我有代码的并行版本和非并行版本.非并行版本可完美运行.并行版本未通过单元测试,但似乎没有道理的错误.

I'm doing some unit testing on a package in development. One of the tests is failing. Specifically, I have parallel versions of the code and non-parallel versions. The non-parallel version works perfectly. The parallel version fails a unit test, with a seemingly nonsensical error.

## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")

## do some setup:
library(imputation)
library(kernlab)
library(parallel)


x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)

opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)


## write 2 identical functions:
## one in parallel
## one not in parallel

foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
  cl <- makeCluster(detectCores() - leave_cores)
  x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
    rowID = as.numeric(i[1])
    i_original = unlist(i[-1])
    x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
    missing_cols <- which(is.na(x_complete[x_comp_rowID,]))

    # calculate distances
    distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ], 
                                                    x_complete[-x_comp_rowID,]), ref= 1L,  q= q)
    return(distances)
  }, x_complete= x_complete)
  stopCluster(cl)
  return(x_missing_imputed)
}

foo_nonparallel <- function(x_missing, x_complete, k, q) {
  x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
    rowID = as.numeric(i[1])
    i_original = unlist(i[-1])
    x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
    missing_cols <- which(is.na(x_complete[x_comp_rowID,]))

    # calculate distances
    distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ], 
                                                    x_complete[-x_comp_rowID,]), ref= 1L,  q= q)
    return(distances)
  }, x_complete= x_complete))
  return(x_missing_imputed)
}

## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works

checkForRemoteErrors(val)中的错误: 2个节点产生错误;第一个错误:ref必须为{1,nrow(x)}中的整数.

Error in checkForRemoteErrors(val) : 2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.

如您所见,ref明确定义为ref= 1L,位于1,nrow(x)中.

As you can see, ref is clearly defined as ref= 1L which is in 1, nrow(x).

library(parallel)的交互导致此错误的原因是什么?

What is going on with the interaction with library(parallel) that is causing this error?

编辑-我在Windows计算机上:

Edit - I'm on a windows machine:

R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1

推荐答案

我已经弄清楚是什么原因引起的.在我看来,这是library(parallel)错误/边缘情况,特定于apply函数的并行化版本(在本例中为parRapply).也许年龄较大,更明智的人可以解释为什么library(parallel)中没有遇到这种情况.

I have figured out what is causing the problem. This seems to me to be a library(parallel) bug / edge-case, specific to the parallelized versions of the apply functions (in this case parRapply). Perhaps someone older and wiser can explain why there isn't a catch in library(parallel) for this edge case.

问题似乎与任务数量与可用工作人员数量有关.在我的机器上,我有一个8核处理器.在这种情况下,有5个任务(prelim$x_missing的每一行一个).

The issue seems to be related to the number of tasks vs the number of workers available. On my machine, I have an 8-core processor. In this case, there are 5 tasks (one for each row of prelim$x_missing).

在通常的使用中,我不会并行处理5行的工作.这只是一个单元测试.

Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.

R> prelim$x_missing
              X1         X2         X3         X4         X5          X6         X7         X8          X9        X10 d_factor
6   6  0.2604170 -0.5966874         NA         NA -0.3013053  0.24313272  0.2836760  0.3977164 -0.60711109 -0.2929253        1
7   7 -0.8540576  0.1409047         NA  0.4801685 -0.9324517 -0.06487733 -0.2220201         NA  1.19077335 -0.3702607        2
8   8  0.5118453 -0.8750674         NA  0.1787238  0.6897163  0.20695122         NA -0.3488021  0.84200408 -0.4791230        1
12 12  0.3695746 -0.4919277 -1.2509180  1.1642152         NA  0.04018417         NA         NA -0.53436589 -1.5400345        2
15 15         NA -0.3608242 -0.6761515 -0.5366562  0.1763501          NA         NA  0.4967595  0.02635203 -0.6015536        1

请注意,我是通过cl <- parallel::makeCluster(detectCores() - leave_cores)建立集群的,其中detectCores()将为当前计算机返回8.函数调用接受一个参数,以使内核数保持打开状态leave_cores.当我创建一个用例中的行数比行数更多的内核/节点的集群时,该函数将失败.当我用< =行数创建集群时,该函数起作用:

Note that I am making the cluster via cl <- parallel::makeCluster(detectCores() - leave_cores) where detectCores() will return 8 for my current machine. The function call accepts a parameter for the number of cores to leave open leave_cores. When I make a cluster with more cores/nodes than rows in the use-case, the function fails. When I make a cluster with <= number of rows, the function works:

 # works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
 [1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624

# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2) 
Error in checkForRemoteErrors(val) : 
  one node produced an error: ref must be an integer in {1, nrow(x)}. 

tl,dr

rparallel小插图,detectCores用于简单地检测内核,它非常合理地不会尝试将任何智能任务分配给工作人员.

tl,dr

As described in the rparallel vignette, detectCores is used to simply detect the cores, it very reasonably does not attempt to do any intelligent assignment of tasks to workers.

function detectCores()函数试图确定运行R的计算机中的CPU内核数:它有办法在所有已知的当前R上执行此操作 平台.它确切衡量的是特定于OS的:我们尝试在可能的情况下报告可用物理内核的数量.在Windows上,默认设置是报告逻辑CPU的数量.在现代硬件(例如Intel Core i7)上,后者可能并非不合理,因为超线程确实提供了显着优势. 额外的吞吐量.

function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant extra throughput.

我正在调用函数parallel::parRapply进行计算. parRapply通过splitRows功能将工作分派给工人.但是splitRows函数似乎没有任何智能或错误捕捉功能.

I am calling the function parallel::parRapply to do the computation. parRapply dispatches the work to the workers via the splitRows function. But there doesn't seem to be any intelligence or error-catching to the splitRows function.

R> parRapply
function (cl = NULL, x, FUN, ...) 
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)), 
        fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>

我找不到splitRows的源代码,但parallel::splitIndices似乎很相似:

I can't find the source code for splitRows but parallel::splitIndices seems similar:

R> parallel:::splitIndices
function (nx, ncl) 
{
    i <- seq_len(nx)
    if (ncl == 0L) 
        list()
    else if (ncl == 1L || nx == 1L) 
        list(i)
    else {
        fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
        breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
        structure(split(i, cut(i, breaks)), names = NULL)
    }
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>

在我的单元测试中,将按以下步骤执行:

In my unit test, this would execute as the following:

# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1

[[2]]
integer(0)

[[3]]
[1] 2

[[4]]
integer(0)

[[5]]
[1] 3

[[6]]
[1] 4

[[7]]
integer(0)

[[8]]
[1] 5

在3个整数(0)的位置,这会导致调用栈中的失败.

Where there are 3 integer(0)s, which cause failure further down the call stack.

# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2

[[2]]
[1] 3

[[3]]
[1] 4 5

如果任何人都可以在下面的注释中提供指向splitRows的源代码的链接,我将很乐意更新此答案. parallel::clusterApplyparallel:::staticClusterApply的代码很容易找到

If anyone can provide a link in the comments below to the source code for splitRows, I'll happily update this answer. Code for parallel::clusterApply and parallel:::staticClusterApply are easily found

这篇关于R并行-parRapply无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆