MPI python-Open-MPI [英] MPI python-Open-MPI

查看:94
本文介绍了MPI python-Open-MPI的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有20,000 * 515 numpy矩阵,代表生物学数据.我需要找到生物学数据的相关系数,这意味着我将拥有一个具有相关值的20,000 * 20,000矩阵.然后,如果每个相关系数都大于阈值,则用1和0填充一个numpy数组.

I have 20,000*515 numpy matrix,representing biological datas. I need to find the correlation coefficient of the biological data,meaning as a result I would have a 20,000*20,000 matrix with correlation value. Then I populate a numpy array with 1's and 0's if each correlation coefficient is greater than a threshold value.

我使用numpy.corrcoef来找到相关系数,并且在我的机器上运行良好.

I used numpy.corrcoef to find the correlation coefficient and it works well in my machine.

然后,我想放入群集中(有10台计算机,节点从2到8不等).当我尝试将其放入群集中时,每个节点都生成(40)个随机数,并从生物学数据中获得这40个随机列,从而得出20,000 * 40矩阵,我遇到了内存问题.

Then I wanted to put in the cluster(having 10 computers and node varying from 2 to 8). when I tried putting it in the cluster, each node generating (40)random numbers and getting those 40 random columns from the biological data resulting in 20,000*40 matrix, I ran into memory issue saying.

mpirun注意到在信号9(已杀死)上退出了带有节点名称上的PID#的进程等级#.

mpirun noticed that process rank # with PID # on node name exited on signal 9 (Killed).

然后,我尝试重写程序,就像让每行都找到相关系数,如果该值大于阈值,则将1存储在矩阵中,否则将0存储,而不是创建相关矩阵. 但是运行该程序需要花费1.30个小时.我需要运行100次.

Then I tried rewriting the program like getting each rows finding the correlation coefficient and if the value is greater than the threshold then store 1 in the matrix or else 0 rather than creating a correlation matrix. But it takes 1.30 hrs to run this program.I need to run it 100 times.

任何人都可以建议一种更好的方法,例如如何在每个核心完成工作后通过分配作业来解决内存问题.我是MPI的新手.下面是我的代码.

Can anyone please suggest a better way of doing this, like how to solve the memory issue by allocating jobs once each core has finished it's job. I am new to MPI. Below is my code.

如果您需要更多信息,请告诉我. 谢谢

If you need any more information please let me know. Thank you

    import numpy
    from mpi4py import MPI
    import time


    Size=MPI.COMM_WORLD.Get_size();
    Rank=MPI.COMM_WORLD.Get_rank();
    Name=MPI.Get_processor_name();

    RandomNumbers={};





    rndm_indx=numpy.random.choice(range(515),40,replace=False)
    rndm_indx=numpy.sort(rndm_indx)


    Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);




    RandomNumbers[Rank]=rndm_indx;
    CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));


    start=time.time();
    for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-np.mean(Data[i]);
        alpha1=1./np.linalg.norm(Data[i]);
        for j in range(i,Data.shape[0]):
           if(i==j):
               CORR_CR[i][j]=1;
           else:
               Data[j]=Data[j]-np.mean(Data[j]);
               alpha2=1./np.linalg.norm(Data[j]);
               corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
               corr=int(np.absolute(corrcoef)>=0.9)
               CORR_CR[i][j]=CORR_CR[j][i]=corr
    end=time.time();           

    CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);  


    elapsed=(end-start)
    print('Total Time',elapsed)

推荐答案

您发布的程序在我的计算机上的执行时间约为96s. 在探索并行计算之前,我们先进行一些优化.

The execution time of the program you posted is about 96s on my computer. Let's optimize a couple of things before exploring parallel computations.

  • 让我们存储向量的范数,以避免每次需要范数时都要进行计算.将alpha1=1./numpy.linalg.norm(Data[i]);移出第二个循环是一个很好的起点.由于向量在计算过程中不会改变,因此可以预先计算其范数:

  • Let's store the norms of the vectors to avoid computing it each time the norm is needed. Getting alpha1=1./numpy.linalg.norm(Data[i]); out of the second loop is a good starting point. Since the vectors do not change during computations, their norms can be computed in advance:

alpha=numpy.zeros(Data.shape[0])
for i in range(0,Data.shape[0]):
  Data[i]=Data[i]-numpy.mean(Data[i])
  alpha[i]=1./numpy.linalg.norm(Data[i])

for i in range(0,Data.shape[0]):
  for j in range(i,Data.shape[0]):
    if(i==j):
       CORR_CR[i][j]=1;
    else:
       corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
       corr=int(numpy.absolute(corr)>=0.9)
       CORR_CR[i][j]=CORR_CR[j][i]=corr

计算时间已经减少到17s.

The computation time is already down to 17s.

  • 假定向量之间的相关性不高,则大多数相关系数将被舍入为零. 因此,矩阵很可能是稀疏的(充满零).让我们使用 scipy.sparse.coo_matrix 稀疏矩阵格式,它很容易填充:非空项目及其坐标i,j将存储在列表中.

  • Assuming that the vectors are not highly correlated, most of the correlation coefficients will be rounded to zero. Hence, the matrix is likely to be sparce (full of zeros). Let's use the scipy.sparse.coo_matrix sparce matrix format, which is very easy to populate: the non-null items and their coordinates i,j are to be stored in lists.

data=[]
ii=[]
jj=[]
...
  if(corr!=0):
           data.append(corr)
           ii.append(i)
           jj.append(j)
           data.append(corr)
           ii.append(j)
           jj.append(i)
...
CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))

计算时间减少到13s(可忽略的改进?),并且大大减少了内存占用.如果考虑使用更大的数据集,那将是一个重大改进.

The computation time is down to 13s (negligeable improvement ?) and the memory footprint is greatly reduced. It would be a major improvement if larger datasets were to be considered.

  • for loops in python are pretty unefficient. See for loop in python is 10x slower than matlab for instance. But there are plenty of ways around, such as using vectorized operations or optimized iterators such as those provided by numpy.nditer. One of the reasons for for loops being unefficient is that python is a interpreted language: not compilation occurs in the process. Hence, to overcome this problem, the most tricky part of the code can be compiled by using a tool like Cython.

  1. 代码的关键部分用Cython编写在专用文件correlator.pyx中.

该文件被Cython转换为correlator.c文件

This file is turned into a correlator.c file by Cython

此文件由您最喜欢的c编译器gcc编译以构建共享库correlator.so

This file is compiled by your favorite c compiler gcc to build a shared library correlator.so

优化的功能可以在import correlator之后在您的程序中使用.

The optimized function can be used in your program after import correlator.

correlator.pyx的内容从 Numpy vs Cython speed 开始,如下所示:

The content of correlator.pyx, starting from Numpy vs Cython speed , looks like:

import numpy

cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):

    cdef unsigned int rows = array.shape[0]
    cdef  int cols = array.shape[1]
    cdef unsigned int row, row2
    cdef  int one=1
    ii=[]
    jj=[]
    data=[]

    for row in range(imin,imax):
        for row2 in range(row,rows):
            if row==row2:
               data.append(0)
               ii.append(row)
               jj.append(row2)
            else:
                corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
                corr=int(numpy.absolute(corr)>=0.9)
                if(corr!=0):
                    data.append(corr)
                    ii.append(row)
                    jj.append(row2)

                    data.append(corr)
                    ii.append(row2)
                    jj.append(row)

    return ii,jj,data

其中 scipy.linalg.cython_blas.ddot() 将执行内部产品.

where scipy.linalg.cython_blas.ddot() will perform the inner product.

要对.pyx文件进行cythonize和编译,可以使用以下makefile(希望您使用的是Linux ...)

To cythonize and compile the .pyx file, the following makefile can be used (I hope you are using Linux...)

all: correlator correlatorb


correlator: correlator.pyx
    cython -a correlator.pyx

correlatorb: correlator.c
    gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c

新的相关函数在主python文件中通过以下方式调用:

The new correlation function is called in the main python file by:

import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])

通过使用编译循环,时间降至5.4s!它已经快了十倍.而且,这些计算是在单个进程上执行的!

By using a compiled loop, the time is down to 5.4s! It's already ten times faster. Moreover, this computations are performed on a single process!

让我们介绍并行计算. 请注意,在函数process中添加了两个参数:iminimax.这些自变量表示所计算的CORR_CR行的范围.执行它是为了预期使用并行计算.确实,并行化程序的一种直接方法是将外部for循环(索引i)拆分为不同的进程.

Let's introduce parallel computations. Please notice that two arguments are added to the function process : imin and imax. These arguments signals the range of rows of CORR_CR that are computed. It is performed so as to anticipate the use of parallel computation. Indeed, a straightforward way to parallelize the program is to split the outer for loop (index i) to the different processes.

每个进程将针对索引i的特定范围执行外部for循环,该索引的计算范围是为了平衡进程之间的工作量.

Each processes will perform the outer for loop for a particular range of the index i which is computed so as to balance the workload between the processes.

程序必须完成以下操作:

The program has to complete the following operations:

  1. 进程0(根进程")读取文件中的向量Data.
  2. 使用MPI函数bcast()Data广播到所有进程.
  3. 计算每个进程要考虑的索引i的范围.
  4. 相关系数是由每个过程为相应的索引计算的.矩阵的非空项存储在每个进程的列表dataiijj中.
  5. 这些列表是通过调用MPI函数gather()在根进程上收集的.它会生成三个Size列表的列表,这些列表被连接起来以获得创建稀疏邻接矩阵所需的3个列表.
  1. Process 0 ("root process") reads the vectors Data in the file.
  2. The Data is broadcast to all processes by using the MPI function bcast().
  3. The range of indexes i to be considered by each process is computed.
  4. The correlation coefficient are computed by each process for the corresponding indexes. The non-null terms of the matrix are stored in lists data,ii,jj on each process.
  5. These lists are gathered on the root process by calling the MPI function gather(). It produces three lists of Size lists which are concatenated to get 3 lists required to create the sparce adjacency matrix.

代码如下:

import numpy
from mpi4py import MPI
import time
import scipy.sparse

import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)

Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

#a dummy set of data is created here. 
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)

Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
    #Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
    Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100

start=time.time();

#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)

RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]

#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-numpy.mean(Data[i])
        if numpy.linalg.norm(Data[i])==0:
            print "process "+str(Rank)+" is facing a big problem"
        else:
            alpha[i]=1./numpy.linalg.norm(Data[i])


#balancing the computational load between the processes. 
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows. 
#Of course, the last rank must care about more rows than the first one.

ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
    nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
    icurr=0
    for i in range(Size):
        nbjob=0
        while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
            nbjob+=(Data.shape[0]-icurr)
            icurr+=1
        ilimits[i+1]=icurr
    ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)          

#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])

#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)

if Rank==0:
   #concatenate the lists
   data2=sum(data,[])
   ii2=sum(ii,[])
   jj2=sum(jj,[])
   #create the adjency matrix
   CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))

   print CORR_CR

end=time.time();           

elapsed=(end-start)
print('Total Time',elapsed)

通过运行mpirun -np 4 main.py,计算时间为3.4s.它不是快4倍...这可能是由于计算的瓶颈是标量乘积的计算,这需要较大的内存带宽.而且我的个人计算机在内存带宽方面确实受到限制...

By running mpirun -np 4 main.py, the computation time is 3.4s. It's not 4 time faster... This is likely due to the fact that the bottleneck of the computation is the computations of scalar products, which requires a large memory bandwidth. And my personnal computer is really limited regarding the memory bandwidth...

最后评论:有很多改进的可能性. -Data中的向量被复制到每个进程...这会影响程序所需的内存.以不同的方式调度计算并针对通信交换内存可以克服此问题... -每个进程仍会计算所有向量的范数...可以通过让每个进程计算某个向量的范数并使用MPI函数

Last comment: there are plenty of possibilities for improvements. - The vectors in Data are copied to every processes... This affects the memory required by the program. Dispatching the computation in a different way and trading memory against communications could overcome this problem... - Each process still computes the norms of all the vectors...It could be improved by having each process computing the norms of some vector and using the MPI function allreduce() on alpha...

如何处理此邻接矩阵?

我认为您已经知道该问题的答案,但是您可以将此邻接矩阵提供给 sparse.csgraph例程,例如光谱聚类不远!

I think you already know the answer to this question, but you can provide this adjacency matrix to sparse.csgraph routines such as connected_components() or laplacian(). Indeed, you are not very far from spectral clustering!

例如,如果群集很明显,则使用

For instance, if the clusters are obvious, using connected_components() is sufficient:

if Rank==0:
    # coo to csr format
    S=CORR_CR.tocsr()
    print S
    n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)

    print "number of different famillies "+str(n_components)

    numpy.set_printoptions(threshold=numpy.nan)
    print labels

这篇关于MPI python-Open-MPI的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆