mpi4py分散并收集大型numpy数组 [英] mpi4py scatter and gather with large numpy arrays
问题描述
我正在尝试使用mpi4py对大型numpy数组进行并行操作.我目前正在使用numpy.array_split
将数组划分为多个块,然后使用com.scatter
将数组发送到不同的内核,然后使用comm.gather
收集生成的数组.下面是一个最小的(不可行的)示例:
I am trying to parallelise some operations on a large numpy array using mpi4py. I am currently using numpy.array_split
to divide the array into chunks, followed by com.scatter
to send the array to different cores and then comm.gather
to collect the resulting arrays. A minimal (not) working example is below:
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
test = np.random.rand(411,48,52,40)
test_chunks = np.array_split(test,size,axis=0)
else:
test_chunks = None
test_chunk = comm.scatter(test_chunks,root=0)
output_chunk = np.zeros([np.shape(test_chunk)[0],128,128,128])
for i in range(0,np.shape(test_chunk)[0],1):
print(i)
output_chunk[i,0:48,0:52,0:40] = test_chunk[i]
outputData = comm.gather(output_chunk,root=0)
if rank == 0:
outputData = np.concatenate(outputData,axis = 0)
运行此命令会给我错误:
Running this gives me the error:
File "test_4d.py", line 23, in <module>
outputData = comm.gather(output_chunk,root=0)
File "Comm.pyx", line 869, in mpi4py.MPI.Comm.gather (src/mpi4py.MPI.c:73266)
File "pickled.pxi", line 614, in mpi4py.MPI.PyMPI_gather (src/mpi4py.MPI.c:33592)
File "pickled.pxi", line 146, in mpi4py.MPI._p_Pickle.allocv (src/mpi4py.MPI.c:28517)
File "pickled.pxi", line 95, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:27832)
SystemError: Negative size passed to PyString_FromStringAndSize
此错误似乎是由于通过collect收集的numpy数组的大小过大导致的;由于分散和收集将数组作为数组列表发送,因此看起来很容易超过列表大小.我遇到的一个建议是使用comm.Scatter和comm.Gather.但是,我一直在努力寻找有关这些功能的清晰文档,但到目前为止仍无法成功实现它们.例如:
This error seems to result from the large size of the numpy arrays being collected by gather; since scatter and gather send the arrays as a list of arrays, it appears easy to exceed the list size. One suggestion I have come across is to use comm.Scatter and comm.Gather. However, I am struggling to find clear documentation for these functions and so far have been unable to successfully implement them. For example:
替换
outputData = comm.gather(output_chunk,root=0)
所在行
outputData=comm.Gather(sendbuf[test_chunks,MPI.DOUBLE],recvbuf=output_chunk,MPI.DOUBLE],root=0)
给出错误:
File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
File "message.pxi", line 51, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19644)
File "asbuffer.pxi", line 108, in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:6757)
File "asbuffer.pxi", line 50, in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:6093)
TypeError: expected a readable buffer object
或以下一行:
outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
给出错误:
File "test_4d_2.py", line 24, in <module>
outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
File "message.pxi", line 60, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19747)
TypeError: unhashable type: 'numpy.ndarray'
此外,输入矩阵test
的大小也可能会增加,这可能对comm.scatter
造成类似的问题.除了comm.Gather
已经存在的问题之外,我不确定如何设置comm.Scatter
,因为recvbuf
是根据test_chunk
的大小定义的,而test_chunk
是comm.scatter
的输出,因此我不能在comm.Scatter
中指定recvbuf
.
Furthermore, the input matrix, test
may also increase in size, which could cause similar problems for comm.scatter
. Aside from the problems I already have with comm.Gather
, I am not sure how to set up comm.Scatter
, since recvbuf
is defined based on the size of test_chunk
, which is the output of comm.scatter
, so hence I can't specify recvbuf
within comm.Scatter
.
推荐答案
解决方案是使用comm.Scatterv
和comm.Gatherv
来发送和接收数据,并将其作为内存块而不是numpy数组的列表来获取.数据大小问题. comm.Scatterv
和comm.Gatherv
假定内存中C阶(行优先)的数据块,并且有必要指定两个向量sendcounts
和displacements
. Sendcounts
给出输入数据分割位置的整数值(索引)(即每个向量的起点要发送到给定的核),而displacements
给出该向量的长度.因此,可以改变发送到每个内核的数据量.可在此处找到更多详细信息: http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html
The solution is to use comm.Scatterv
and comm.Gatherv
which send and receive the data as a block of memory, rather than a list of numpy arrays, getting around the data size issue. comm.Scatterv
and comm.Gatherv
assume a block of data in C-order (row-major) in memory and it is necessary to specify two vectors, sendcounts
and displacements
. Sendcounts
gives the integer value (index) for the positions at which to split the input data (i.e. the starting point of each vector to send to a given core), while displacements
gives the length of that vector. Hence it is possible to vary the amount of data sent to each core. More details can be found here: http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html
以下是将comm.Scatterv
和comm.Gatherv
用于2D矩阵的示例:
mpi4py Scatterv沿哪个轴函数拆分了一个numpy数组?
An example using comm.Scatterv
and comm.Gatherv
for a 2D matrix is given here:
Along what axis does mpi4py Scatterv function split a numpy array?
这篇关于mpi4py分散并收集大型numpy数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!