mpi4py分散并收集大型numpy数组 [英] mpi4py scatter and gather with large numpy arrays

查看:225
本文介绍了mpi4py分散并收集大型numpy数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用mpi4py对大型numpy数组进行并行操作.我目前正在使用numpy.array_split将数组划分为多个块,然后使用com.scatter将数组发送到不同的内核,然后使用comm.gather收集生成的数组.下面是一个最小的(不可行的)示例:

I am trying to parallelise some operations on a large numpy array using mpi4py. I am currently using numpy.array_split to divide the array into chunks, followed by com.scatter to send the array to different cores and then comm.gather to collect the resulting arrays. A minimal (not) working example is below:

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    test = np.random.rand(411,48,52,40)
    test_chunks = np.array_split(test,size,axis=0)

else:
    test_chunks = None

test_chunk = comm.scatter(test_chunks,root=0)
output_chunk = np.zeros([np.shape(test_chunk)[0],128,128,128])

for i in range(0,np.shape(test_chunk)[0],1):
    print(i)
    output_chunk[i,0:48,0:52,0:40] = test_chunk[i]

outputData = comm.gather(output_chunk,root=0)


if rank == 0:
    outputData = np.concatenate(outputData,axis = 0)

运行此命令会给我错误:

Running this gives me the error:

  File "test_4d.py", line 23, in <module>
    outputData = comm.gather(output_chunk,root=0)
  File "Comm.pyx", line 869, in mpi4py.MPI.Comm.gather (src/mpi4py.MPI.c:73266)
  File "pickled.pxi", line 614, in mpi4py.MPI.PyMPI_gather (src/mpi4py.MPI.c:33592)
  File "pickled.pxi", line 146, in mpi4py.MPI._p_Pickle.allocv (src/mpi4py.MPI.c:28517)
  File "pickled.pxi", line 95, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:27832)
SystemError: Negative size passed to PyString_FromStringAndSize

此错误似乎是由于通过collect收集的numpy数组的大小过大导致的;由于分散和收集将数组作为数组列表发送,因此看起来很容易超过列表大小.我遇到的一个建议是使用comm.Scatter和comm.Gather.但是,我一直在努力寻找有关这些功能的清晰文档,但到目前为止仍无法成功实现它们.例如:

This error seems to result from the large size of the numpy arrays being collected by gather; since scatter and gather send the arrays as a list of arrays, it appears easy to exceed the list size. One suggestion I have come across is to use comm.Scatter and comm.Gather. However, I am struggling to find clear documentation for these functions and so far have been unable to successfully implement them. For example:

替换

outputData = comm.gather(output_chunk,root=0)

所在行

outputData=comm.Gather(sendbuf[test_chunks,MPI.DOUBLE],recvbuf=output_chunk,MPI.DOUBLE],root=0)

给出错误:

  File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
  File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
  File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
  File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
  File "message.pxi", line 51, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19644)
  File "asbuffer.pxi", line 108, in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:6757)
  File "asbuffer.pxi", line 50, in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:6093)
TypeError: expected a readable buffer object

或以下一行:

outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)

给出错误:

  File "test_4d_2.py", line 24, in <module>
    outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
  File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
  File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
  File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
  File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
  File "message.pxi", line 60, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19747)
TypeError: unhashable type: 'numpy.ndarray'

此外,输入矩阵test的大小也可能会增加,这可能对comm.scatter造成类似的问题.除了comm.Gather已经存在的问题之外,我不确定如何设置comm.Scatter,因为recvbuf是根据test_chunk的大小定义的,而test_chunkcomm.scatter的输出,因此我不能在comm.Scatter中指定recvbuf.

Furthermore, the input matrix, test may also increase in size, which could cause similar problems for comm.scatter. Aside from the problems I already have with comm.Gather, I am not sure how to set up comm.Scatter, since recvbuf is defined based on the size of test_chunk, which is the output of comm.scatter, so hence I can't specify recvbuf within comm.Scatter.

推荐答案

解决方案是使用comm.Scattervcomm.Gatherv来发送和接收数据,并将其作为内存块而不是numpy数组的列表来获取.数据大小问题. comm.Scattervcomm.Gatherv假定内存中C阶(行优先)的数据块,并且有必要指定两个向量sendcountsdisplacements. Sendcounts给出输入数据分割位置的整数值(索引)(即每个向量的起点要发送到给定的核),而displacements给出该向量的长度.因此,可以改变发送到每个内核的数据量.可在此处找到更多详细信息: http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html

The solution is to use comm.Scatterv and comm.Gatherv which send and receive the data as a block of memory, rather than a list of numpy arrays, getting around the data size issue. comm.Scatterv and comm.Gatherv assume a block of data in C-order (row-major) in memory and it is necessary to specify two vectors, sendcounts and displacements. Sendcounts gives the integer value (index) for the positions at which to split the input data (i.e. the starting point of each vector to send to a given core), while displacements gives the length of that vector. Hence it is possible to vary the amount of data sent to each core. More details can be found here: http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html

以下是将comm.Scattervcomm.Gatherv用于2D矩阵的示例: mpi4py Scatterv沿哪个轴函数拆分了一个numpy数组?

An example using comm.Scatterv and comm.Gatherv for a 2D matrix is given here: Along what axis does mpi4py Scatterv function split a numpy array?

这篇关于mpi4py分散并收集大型numpy数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆