如果if循环并行运行 [英] Run if loop in parallel

查看:105
本文介绍了如果if循环并行运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个需要遍历约400万行的数据集.数据结构是有相互依赖的重复ID,但数据在各个ID之间是独立的.对于每个ID,[i + 1]行都取决于[i].这是一个可重现的示例.我确实意识到该示例在内部功能方面不切实际,而仅仅是对我所拥有结构的演示.

I have a data set with ~4 million rows that I need to loop over. The data structure is there are repeated IDs that are dependent on each other but data is independent across IDs. For each ID, the [i+1] row is a dependent on [i]. Here is a reproducible example. I do realize that this example is not practical in terms of the inner functions but it is simply a demonstration of the structure I have.

set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
y = rep(0,15)

df = as.data.frame(cbind(ids,month,x,y))

for (i in 1:nrow(df)){
  if(i>1 && df[i,1]==df[i-1,1]){
    #Main functions go here
    df[i,4] = df[i-1,4]^2+df[i,3]
  }
  else {
    df[i,4] = 1
  }
}

实际上,问题是实际函数的1000个循环需要90秒钟左右,因此400万行需要几天的时间.以这种方式运行对我来说是不可行的.但是,这些ID是独立的,不需要一起运行.我的问题是:是否可以并行运行这种类型的循环?一个非常不优雅的解决方案是将文件分为50个部分,而无需拆分ID,而仅对50个子文件运行相同的代码.我认为应该有一种编码方法.

The issue is in reality 1000 loops of the real function takes ~90 seconds, so 4 million rows takes days. It isn't feasible for me to run this way. However the IDs are independent and don't need to run together. My question is: is there a way to run this type of loop in parallel? A very non-elegant solution would be to split the file into 50 sections without splitting an ID and simply run the same code on the 50 sub-files. I figure there should be a way to code this though.

编辑:添加了月份列,以显示各行之间相互依赖的原因.要在下面发表两条评论:

EDIT: Added month column to show why the rows are dependent on each other. To address two comments below:

1)实际上有6-7行功能要运行.我可以将ifelse()与多个函数一起使用吗?
2)所需的输出将是完整的数据帧.实际上,有更多列,但我需要数据框中的每一行.

1) There are actually 6-7 lines of functions to run. Could I use ifelse() with multiple functions?
2) The desired output would be the full data frame. In reality there are more columns but I need each row in a data frame.

   ids month  x      y
1    1     1 -1      1
2    1     2  1      2
3    1     3 10     14
4    1     4  2    198
5    1     5  3  39207
6    2     1 11      1
7    2     2  4      5
8    2     3 -4     21
9    2     4 -1    440
10   2     5  0 193600
11   3     1  8      1
12   3     2  4      5
13   3     3  4     29
14   3     4  3    844
15   3     5 -1 712335

我尝试从另一篇文章中应用foreach()包,但是它似乎不起作用.该代码将运行,但是我认为问题在于行在内核之间的分配方式.如果将每一行顺序发送到不同的内核,则相同的ID将永远不在同一内核中.

I've tried applying the foreach() package from another post but it doesn't seem to work. This code will run but I think the issue is the way that rows are distributed among cores. If each row is sequentially sent to a different core then the same ID will never be in the same core.

library(foreach)
library(doParallel)


set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
y = rep(0,15)

df = as.data.frame(cbind(ids,month,x,y))

#setup parallel backend to use many processors
cores=detectCores()
cl <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(cl)

finalMatrix <- foreach(i=1:nrow(df), .combine=cbind) %dopar% {

  for (i in 1:nrow(df)){
    if(i>1 && df[i,1]==df[i-1,1]){
      #Main functions go here
      df[i,4] = df[i-1,4]^2+df[i,3]
    }
    else {
      df[i,4] = 1
    }
  }
}
#stop cluster
stopCluster(cl)

推荐答案

以下是使用foreach的解决方案.很难说它如何在您的实际示例中发挥作用,至少它可以与testdata一起使用...

Here's a solution using foreach. Hard to say how it would work in your real life example, at least it works with the testdata ...

首先,我生成一些测试数据:

First I generate some testdata:

# function to generate testdata

genDat <- function(id){

  # observations per id, fixed or random
  n <- 50
  #n <- round(runif(1,5,1000))

  return(

    data.frame(id=id,month=rep(1:12,ceiling(n/12))[1:n],x=round(rnorm(n,2,5)),y=rep(0,n))

  )
}

#generate testdata

testdat <- do.call(rbind,lapply(1:90000,genDat))


> head(testdat)
  id month  x y
1  1     1  7 0
2  1     2  6 0
3  1     3 -9 0
4  1     4  3 0
5  1     5 -9 0
6  1     6  8 0


> str(testdat)
'data.frame':   4500000 obs. of  4 variables:
 $ id   : int  1 1 1 1 1 1 1 1 1 1 ...
 $ month: int  1 2 3 4 5 6 7 8 9 10 ...
 $ x    : num  7 6 -9 3 -9 8 -4 13 0 5 ...
 $ y    : num  0 0 0 0 0 0 0 0 0 0 ...

因此,测试数据约有450万行,带有9万个唯一ID.

So the testdata has ~ 4.5 million rows with 90k unique ids.

现在,由于您的计算在ID之间是独立的,因此,其想法是将具有唯一ID的数据运送到每个核...最终也将摆脱ififelse条件的必要性.

Now since your calculations are independent between the IDs, the idea would be to ship off data with unique IDs to each core ... this would ultimately also get rid of the necessity for an if or ifelse condition.

为此,我首先生成一个包含开始和结束行索引的矩阵,以将数据集拆分为唯一的ID:

To do this, I first generate a matrix with start and stop row indices, to split the dataset in unique IDs:

id_len <- rle(testdat$id)

ixmat <- cbind(c(1,head(cumsum(id_len$lengths)+1,-1)),cumsum(id_len$lengths))

然后可以将此矩阵传递给foreach以并行运行特定部分.

This matrix can then be passed on to foreach for running the specific parts in parallel.

在此示例中,我略微修改了您的计算,以避免导致Inf的天文数值.

In this example I modify your calculations slightly to avoid astronomical values leading to Inf.

library(parallel)
library(doParallel)
library(iterators)

cl <- makeCluster(parallel::detectCores())
registerDoParallel(cl)   #create a cluster


r <-  foreach (i = iter(ixmat,by='row')) %dopar% {

  x <- testdat$x[i[1,1]:i[1,2]]
  y <- testdat$y[i[1,1]:i[1,2]]
  y[1] <- 1

  for(j in 2:length(y)){
    #y[j] <- (y[j-1]^2) + x[j] ##gets INF
    y[j] <- y[j-1] + x[j]
    }

  return(y)
}

parallel::stopCluster(cl)

最后,您可以替换原始数据框中的值:

Finally you could replace the values in the original dataframe:

testdat$y <- unlist(r)

在时间上,foreach循环在我的8核计算机上运行约40秒.

As for the time, the foreach loop runs in about 40 seconds on my 8 core machine.

这篇关于如果if循环并行运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆