使用 MPI 散布不同大小的矩阵块 [英] Scatter Matrix Blocks of Different Sizes using MPI

查看:32
本文介绍了使用 MPI 散布不同大小的矩阵块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(假设所有矩阵都按行优先顺序存储.)说明问题的一个例子是将一个 10x10 的矩阵分布在 3x3 的网格上,这样每个节点中子矩阵的大小看起来像

|-----+-----+-----||3x3 |3x3 |3x4 ||-----+-----+-----||3x3 |3x3 |3x4 ||-----+-----+-----||4x3 |4x3 |4x4 ||-----+-----+-----|

我在 Stackoverflow 上看过很多帖子(例如 使用 MPIMPI 将矩阵分割成块).但它们只处理相同大小的块(在这种情况下,我们可以简单地使用 MPI_Type_vectorMPI_Type_create_subarray 并且只有一个 MPI_Scatterv 调用).

所以,我想知道 MPI 中将矩阵分散到处理器网格的最有效方法是什么,其中每个处理器都有一个指定大小的块.

附言我还查看了 MPI_Type_create_darray,但它似乎不允许您为每个处理器指定块大小.

解决方案

您必须至少通过 MPI 中的一个额外步骤才能做到这一点.

问题是最通用的收集/分散例程,MPI_ScattervMPI_Gatherv,允许你传递一个向量"(v)计数/位移,而不仅仅是 Scatter 和 Gather 的一个计数,但 类型 都被假定为相同.在这里,没有办法绕过它;每个块的内存布局都不同,因此必须以不同的类型处理.如果块之间只有一个差异 -ndash;有些有不同的列数,有些有不同的行数——那么只使用不同的计数就足够了.但是对于不同的列行,计数不会这样做;您确实需要能够指定不同的类型.

所以你真正想要的是一个经常讨论但从未实现的 MPI_Scatterw(其中 w 表示 vv;例如,计数和类型都是向量)例程.但是这样的事情是不存在的.您可以获得的最接近的是更通用的 MPI_Alltoallw 调用,它允许完全通用的全对全数据发送和接收;正如规范所述,"MPI_ALLTOALLW 函数通过仔细选择输入来概括几个 MPI 函数参数.例如,通过使除一个进程之外的所有进程都具有 sendcounts(i) = 0,这实现了 MPI_SCATTERW 功能.".

因此,您可以使用 MPI_Alltoallw 执行此操作,方法是让除最初拥有所有数据的进程(我们假设此处为 0 级)以外的所有进程将所有发送计数都发送为零.除了第一个任务之外,所有任务的所有接收计数都将为零 - 他们将从零级获得的数据量.

对于进程0的发送计数,我们首先要定义四种不同的类型(4种不同大小的子数组),然后发送计数都会是1,剩下的就是计算发送位移(与 scatterv 不同,这里以字节为单位,因为没有单一类型可以用作单位):

/* 4 种类型的块 -* 块大小*块大小,块大小+1*块大小,块大小*块大小+1,块大小+1*块大小+1*/MPI_Datatype 块类型[4];int subsizes[2];int 开始[2] = {0,0};for (int i=0; i<2; i++) {subsizes[0] = 块大小+i;for (int j=0; j<2; j++) {子尺寸[1] = 块尺寸+j;MPI_Type_create_subarray(2, globalsizes, subsizes,starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]);MPI_Type_commit(&blocktypes[2*i+j]);}}/* 现在计算出每个处理器数据的位移和类型 */for (int proc=0; proc

这会奏效.

但问题是 Alltoallw 函数是如此的通用,以至于实现很难在优化方面做很多事情;因此,如果这与分散的相同大小的块一样好,我会感到惊讶.

所以另一种方法是进行类似两个阶段的交流.

最简单的方法是在注意到您可以几乎通过单个 MPI_Scatterv() 调用获取需要的所有数据:在您的示例中,如果我们以 column=1 和 rows=3(域的大多数块中的行数)的单列向量为单位进行操作,您可以将几乎所有全局数据分散到其他处理器.每个处理器获得 3 或 4 个这些向量,这些向量分布除全局数组的最后一行之外的所有数据,这可以通过简单的第二个 scatterv 处理.看起来像这样;

/* 我们将主要以正常"大小的块的单列为单位进行操作.* 需要有两个向量来描述这些列;一个在上下文中* 全局数组,本地结果之一.*/MPI_Datatype vec, localvec;MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec);MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);MPI_Type_commit(&localvec);MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec);MPI_Type_create_resized(vec, 0, sizeof(char), &vec);MPI_Type_commit(&vec);/* 发起进程需要分配和填充源数组,* 然后定义定义要发送的数组块的类型,以及* 填写 senddispls、sendcounts (1) 和 sendtypes.*/如果(等级== 0){/* 创建将发送一列正常"大小的块的向量类型 *//* 那么除了最后一行的处理器之外的所有处理器都需要获取 blocksize*vec 或 (blocksize+1)*vec *//* 仍然需要做一些事情来整理最后一行值 *//* 我们需要使类型具有 1 个字符的范围以进行散射 */for (int proc=0; proc

到目前为止一切顺利.但是在最后的清理"分散期间让大多数处理器无所事事是一种耻辱.

所以更好的方法是在第一阶段分散所有行,然后在第二阶段将数据分散到列中.在这里,我们创建了新的通信器,每个处理器属于两个新的通信器——一个代表同一块行中的其他处理器,另一个代表同一块列中的其他处理器.在第一步中,原始处理器将全局阵列的所有行分发给同一列通信器中的其他处理器——这可以在单个分散中完成.然后,这些处理器使用单个 scatterv 和与前一个示例中相同的列数据类型,将列分散到与其相同的块行中的每个处理器.结果是两个相当简单的 scatterv 分布所有数据:

/* 创建具有相同行或列的处理器的通信器*/MPI_Comm colComm, rowComm;MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm);MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm);/* 首先,将数组按行分散,0列处理器对应每一行* 接收数据 */如果(霉菌== 0){int sendcounts[块[0]];int senddispls[块[0]];senddispls[0] = 0;for (int row=0; row 0)senddispls[row] = senddispls[row-1] + sendcounts[row-1];}/* 最后一个处理器再得到一个 */sendcounts[blocks[0]-1] += globalsizes[1];/* 分配我的行数据 */rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );/* 执行行的分散 */MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,&(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);}/* 现在,在每一行处理器中,我们可以分散列.* 我们可以像在前面的例子中那样做;创建一个向量*(和localvector)类型并相应地分散*/int locnrows = 块大小;if ( isLastRow(myrow, blocks) )锁链++;MPI_Datatype vec, localvec;MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);MPI_Type_create_resized(vec, 0, sizeof(char), &vec);MPI_Type_commit(&vec);MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);MPI_Type_commit(&localvec);int sendcounts[块[1]];int senddispls[块[1]];如果(霉菌== 0){for (int col=0; col

这更简单,应该在性能和健壮性之间取得相对较好的平衡.

运行所有这三种方法都有效:

bash-3.2$ mpirun -np 6 ./allmethods alltoall全局数组:abcdefg希克林opqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd埃夫吉克lmnopqr方法 - alltoall等级 0:美国广播公司嗨运维排名 1:定义克林姆rstu排名 2:虚拟机编码器jkl排名 3:伊扎布福吉mnop排名 4:qrsxyzefg明尼苏达州排名 5:tuvwA B C Dhijkopqrbash-3.2$ mpirun -np 6 ./allmethods twophasevecs全局数组:abcdefg希克林opqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd埃夫吉克lmnopqr方法 - 两阶段,向量,然后清理等级 0:美国广播公司嗨运维排名 1:定义克林姆rstu排名 2:虚拟机编码器jkl排名 3:伊扎布福吉mnop排名 4:qrsxyzefg明尼苏达州排名 5:tuvwA B C Dhijkopqrbash-3.2$ mpirun -np 6 ./allmethods twophaserowcol全局数组:abcdefg希克林opqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd埃夫吉克lmnopqr方法 - 两相 - 行,列等级 0:美国广播公司嗨运维排名 1:定义克林姆rstu排名 2:虚拟机编码器jkl排名 3:伊扎布福吉mnop排名 4:qrsxyzefg明尼苏达州排名 5:tuvwA B C Dhijkopqr

实现这些方法的代码如下;您可以针对您的问题将块大小设置为更典型的大小,并在实际数量的处理器上运行,以了解哪种处理器最适合您的应用程序.

#include #include #include #include "mpi.h"/* 辅助例程,在程序末尾找到 */char **allocchar2darray(int n, int m);void freechar2darray(char **a);void printarray(char **data, int n, int m);void rowcol(int rank, const int blocks[2], int *row, int *col);int isLastRow(int row, const int blocks[2]);int isLastCol(int col, const int blocks[2]);int typeIdx(int row, int col, const int blocks[2]);/* 第一种方法 - alltoallw */void alltoall(const int myrow,const int mycol,const int rank,const int 大小,const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],const char *const globalptr, char **localdata) {/** 为 alltoallw 呼叫准备好发送和接收计数.* 每个人都只会收到来自 proc 0 的一个区块;* 大多数 proc 不会向任何人发送任何内容.*/int sendcounts[大小];int senddispls[大小];MPI_Datatype sendtypes[size];int recvcounts[大小];int recvdispls[大小];MPI_Datatype recvtypes[size];for (int proc=0; proc 0)senddispls[row] = senddispls[row-1] + sendcounts[row-1];}/* 最后一个处理器再得到一个 */sendcounts[blocks[0]-1] += globalsizes[1];/* 分配我的行数据 */rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );/* 执行行的分散 */MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,&(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);}/* 现在,在每一行处理器中,我们可以分散列.* 我们可以像在前面的例子中那样做;创建一个向量*(和localvector)类型并相应地分散*/int locnrows = 块大小;if ( isLastRow(myrow, blocks) )锁链++;MPI_Datatype vec, localvec;MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);MPI_Type_create_resized(vec, 0, sizeof(char), &vec);MPI_Type_commit(&vec);MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);MPI_Type_commit(&localvec);int sendcounts[块[1]];int senddispls[块[1]];如果(霉菌== 0){for (int col=0; col

(Suppose all the matrices are stored in row-major order.) An example that illustrate the problem is to distribute a 10x10 matrix over a 3x3 grid, so that the size of the sub-matrices in each node looks like

|-----+-----+-----|
| 3x3 | 3x3 | 3x4 |
|-----+-----+-----|
| 3x3 | 3x3 | 3x4 |
|-----+-----+-----|
| 4x3 | 4x3 | 4x4 |
|-----+-----+-----|

I've seen many posts on Stackoverflow (such as sending blocks of 2D array in C using MPI and MPI partition matrix into blocks). But they only deal with blocks of same size (in which case we can simply use MPI_Type_vector or MPI_Type_create_subarray and only one MPI_Scatterv call).

So, I'm wondering what is the most efficient way in MPI to scatter a matrix to a grid of processors where each processor has a block with a specified size.

P.S. I've also looked at MPI_Type_create_darray, but it seems not letting you specify block size for each processor.

解决方案

You have to go through at least one extra step in MPI to do this.

The problem is that the most general of the gather/scatter routines, MPI_Scatterv and MPI_Gatherv, allow you to pass a "vector" (v) of counts/displacements, rather than just one count for Scatter and Gather, but the types are all assumed to be the same. Here, there's no way around it; the memory layouts of each block are different, and so have to be treated by a different type. If there were only one difference between the blocks – some had different numbers of columns, or some had different number of rows – then just using different counts would suffice. But with different columns and rows, counts won't do it; you really need to be able to specify different types.

So what you really want is an often-discussed but never implemented MPI_Scatterw (where w means vv; e.g., both counts and types are vectors) routine. But such a thing doesn't exist. The closest you can get is the much more general MPI_Alltoallw call, which allows completely general all-to-all sending and receiving of data; as the spec states, "The MPI_ALLTOALLW function generalizes several MPI functions by carefully selecting the input arguments. For example, by making all but one process have sendcounts(i) = 0, this achieves an MPI_SCATTERW function.".

So you can do this with MPI_Alltoallw by having all processes other than the one that originally has all the data ( we'll assume that it's rank 0 here) sent all their send counts to zero. All tasks will also have all their receive counts to zero except for the first - the amount of data they'll get from rank zero.

For process 0's send counts, we'll first have to define four different kinds of types (the 4 different sizes of subarrays), and then the send counts will all be 1, and the only part that remains is figuring out the send displacements (which, unlike scatterv, is here in units of bytes, because there's no single type one could use as a unit):

        /* 4 types of blocks - 
         * blocksize*blocksize, blocksize+1*blocksize, blocksize*blocksize+1, blocksize+1*blocksize+1
         */

        MPI_Datatype blocktypes[4];
        int subsizes[2];
        int starts[2] = {0,0};
        for (int i=0; i<2; i++) {
           subsizes[0] = blocksize+i;
           for (int j=0; j<2; j++) {
               subsizes[1] = blocksize+j;
               MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]);
               MPI_Type_commit(&blocktypes[2*i+j]);
           }
        }

        /* now figure out the displacement and type of each processor's data */
        for (int proc=0; proc<size; proc++) {
            int row, col;
            rowcol(proc, blocks, &row, &col);

            sendcounts[proc] = 1;
            senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char);

            int idx = typeIdx(row, col, blocks);
            sendtypes[proc] = blocktypes[idx];
        }
    }

    MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes,
                  &(localdata[0][0]), recvcounts, recvdispls, recvtypes, 
                  MPI_COMM_WORLD);

And this will work.

But the problem is that the Alltoallw function is so completely general, that it's difficult for implementations to do much in the line of optimization; so I'd be surprised if this performed as well as a scatter of equally-sized blocks.

So another approach is to do something like a two phases of communication.

The simplest such approach follows after noting that you can almost get all the data where it needs to go with a single MPI_Scatterv() call: in your example, if we operate in units of a single column vector with column=1 and rows=3 (the number of rows in most of the blocks of the domain), you can scatter almost all of the global data to the other processors. The processors each get 3 or 4 of these vectors, which distributes all of the data except the very last row of the global array, which can be handled by a simple second scatterv. That looks like this;

/* We're going to be operating mostly in units of a single column of a "normal" sized block.
 * There will need to be two vectors describing these columns; one in the context of the
 * global array, and one in the local results.
 */
MPI_Datatype vec, localvec;
MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);

MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);

/* The originating process needs to allocate and fill the source array,
 * and then define types defining the array chunks to send, and
 * fill out senddispls, sendcounts (1) and sendtypes.
 */
if (rank == 0) {
    /* create the vector type which will send one column of a "normal" sized-block */
    /* then all processors except those in the last row need to get blocksize*vec or (blocksize+1)*vec */
    /* will still have to do something to tidy up the last row of values */
    /* we need to make the type have extent of 1 char for scattering */
    for (int proc=0; proc<size; proc++) {
        int row, col;
        rowcol(proc, blocks, &row, &col);

        sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
        senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize);
    }
}

recvcounts = localsizes[1];
MPI_Scatterv(globalptr, sendcounts, senddispls, vec,
              &(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD);

MPI_Type_free(&localvec);
if (rank == 0)
    MPI_Type_free(&vec);

/* now we need to do one more scatter, scattering just the last row of data
 * just to the processors on the last row.
 * Here we recompute the send counts
 */
if (rank == 0) {
    for (int proc=0; proc<size; proc++) {
        int row, col;
        rowcol(proc, blocks, &row, &col);
        sendcounts[proc] = 0;
        senddispls[proc] = 0;

        if ( isLastRow(row,blocks) ) {
            sendcounts[proc] = blocksize;
            senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize;
            if ( isLastCol(col,blocks) )
                sendcounts[proc] += 1;
        }
    }
}

recvcounts = 0;
if ( isLastRow(myrow, blocks) ) {
    recvcounts = blocksize;
    if ( isLastCol(mycol, blocks) )
        recvcounts++;
}
MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
              &(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD);

So far so good. But it's a shame to have most of the processors sitting around doing nothing during that final, "cleanup" scatterv.

So a nicer approach is to scatter all the rows in a first phase, and scatter that data amongst the columns in a second phase. Here we create new communicators, with each processor belonging to two new communicators - one representing other processors in the same block row, and the other in the same block column. In the first step, the origin processor distributes all the rows of the global array to the other processors in the same column communicator - which can be done in a single scatterv. Then those processors, using a single scatterv and the same columns data type as in the previous example, scatter the columns to each processor in the same block row as it. The result is two fairly simple scatterv's distributing all of the data:

/* create communicators which have processors with the same row or column in them*/
MPI_Comm colComm, rowComm;
MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm);
MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm);

/* first, scatter the array by rows, with the processor in column 0 corresponding to each row
 * receiving the data */
if (mycol == 0) {
    int sendcounts[ blocks[0] ];
    int senddispls[ blocks[0] ];
    senddispls[0] = 0;

    for (int row=0; row<blocks[0]; row++) {
        /* each processor gets blocksize rows, each of size globalsizes[1]... */
        sendcounts[row] = blocksize*globalsizes[1];
        if (row > 0)
            senddispls[row] = senddispls[row-1] + sendcounts[row-1];
    }
    /* the last processor gets one more */
    sendcounts[blocks[0]-1] += globalsizes[1];

    /* allocate my rowdata */
    rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );

    /* perform the scatter of rows */
    MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
                  &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);

}

/* Now, within each row of processors, we can scatter the columns.
 * We can do this as we did in the previous example; create a vector
 * (and localvector) type and scatter accordingly */
int locnrows = blocksize;
if ( isLastRow(myrow, blocks) )
    locnrows++;
MPI_Datatype vec, localvec;
MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);

MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);

int sendcounts[ blocks[1] ];
int senddispls[ blocks[1] ];
if (mycol == 0) {
    for (int col=0; col<blocks[1]; col++) {
        sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
        senddispls[col] = col*blocksize;
    }
}
char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL;

MPI_Scatterv(rowptr, sendcounts, senddispls, vec,
              &(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm);

which is simpler and should be a relatively good balance between performance and robustness.

Running all these three methods works:

bash-3.2$ mpirun -np 6 ./allmethods alltoall
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - alltoall

Rank 0:
abc
hij
opq

Rank 1:
defg
klmn
rstu

Rank 2:
vwx
cde
jkl

Rank 3:
yzab
fghi
mnop

Rank 4:
qrs
xyz
efg
lmn

Rank 5:
tuvw
abcd
hijk
opqr

bash-3.2$ mpirun -np 6 ./allmethods twophasevecs
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - two phase, vectors, then cleanup

Rank 0:
abc
hij
opq

Rank 1:
defg
klmn
rstu

Rank 2:
vwx
cde
jkl

Rank 3:
yzab
fghi
mnop

Rank 4:
qrs
xyz
efg
lmn

Rank 5:
tuvw
abcd
hijk
opqr
bash-3.2$ mpirun -np 6 ./allmethods twophaserowcol
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - two phase - row, cols

Rank 0:
abc
hij
opq

Rank 1:
defg
klmn
rstu

Rank 2:
vwx
cde
jkl

Rank 3:
yzab
fghi
mnop

Rank 4:
qrs
xyz
efg
lmn

Rank 5:
tuvw
abcd
hijk
opqr

The code implementing these methods follows; you can set block sizes to more typical sizes for your problem and run on a realistic number of processors to get some sense of which will be best for your application.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mpi.h"

/* auxiliary routines, found at end of program */

char **allocchar2darray(int n, int m);
void freechar2darray(char **a);
void printarray(char **data, int n, int m);
void rowcol(int rank, const int blocks[2], int *row, int *col);
int isLastRow(int row, const int blocks[2]);
int isLastCol(int col, const int blocks[2]);
int typeIdx(int row, int col, const int blocks[2]);

/* first method - alltoallw */
void alltoall(const int myrow, const int mycol, const int rank, const int size, 
                    const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],
                    const char *const globalptr,  char **localdata) {
    /*
     * get send and recieve counts ready for alltoallw call. 
     * everyone will be recieving just one block from proc 0; 
     * most procs will be sending nothing to anyone. 
     */
    int sendcounts[ size ];
    int senddispls[ size ];
    MPI_Datatype sendtypes[size];
    int recvcounts[ size ];
    int recvdispls[ size ];
    MPI_Datatype recvtypes[size];

    for (int proc=0; proc<size; proc++) {
        recvcounts[proc] = 0;
        recvdispls[proc] = 0;
        recvtypes[proc] = MPI_CHAR;

        sendcounts[proc] = 0;
        senddispls[proc] = 0;
        sendtypes[proc] = MPI_CHAR;
    }
    recvcounts[0] = localsizes[0]*localsizes[1];
    recvdispls[0] = 0;


    /* The originating process needs to allocate and fill the source array,
     * and then define types defining the array chunks to send, and 
     * fill out senddispls, sendcounts (1) and sendtypes.
     */
    if (rank == 0) {
        /* 4 types of blocks - 
         * blocksize*blocksize, blocksize+1*blocksize, blocksize*blocksize+1, blocksize+1*blocksize+1
         */
        MPI_Datatype blocktypes[4];
        int subsizes[2];
        int starts[2] = {0,0};
        for (int i=0; i<2; i++) {
           subsizes[0] = blocksize+i;
           for (int j=0; j<2; j++) {
               subsizes[1] = blocksize+j;
               MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]);
               MPI_Type_commit(&blocktypes[2*i+j]);
           }
        }

        /* now figure out the displacement and type of each processor's data */
        for (int proc=0; proc<size; proc++) {
            int row, col;
            rowcol(proc, blocks, &row, &col);

            sendcounts[proc] = 1;
            senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char);

            int idx = typeIdx(row, col, blocks);
            sendtypes[proc] = blocktypes[idx];
        }
    }

    MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes,
                  &(localdata[0][0]), recvcounts, recvdispls, recvtypes, 
                  MPI_COMM_WORLD);
}


/* second  method: distribute almost all data using colums of size blocksize, 
 * then clean up the last row with another scatterv */

void twophasevecs(const int myrow, const int mycol, const int rank, const int size, 
                    const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],
                    const char *const globalptr,  char **localdata) {
    int sendcounts[ size ];
    int senddispls[ size ];
    int recvcounts;

    for (int proc=0; proc<size; proc++) {
        sendcounts[proc] = 0;
        senddispls[proc] = 0;
    }

    /* We're going to be operating mostly in units of a single column of a "normal" sized block.
     * There will need to be two vectors describing these columns; one in the context of the
     * global array, and one in the local results.
     */
    MPI_Datatype vec, localvec;
    MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec);
    MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
    MPI_Type_commit(&localvec);

    MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec);
    MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
    MPI_Type_commit(&vec);

    /* The originating process needs to allocate and fill the source array,
     * and then define types defining the array chunks to send, and 
     * fill out senddispls, sendcounts (1) and sendtypes.
     */
    if (rank == 0) {
        /* create the vector type which will send one column of a "normal" sized-block */
        /* then all processors except those in the last row need to get blocksize*vec or (blocksize+1)*vec */
        /* will still have to do something to tidy up the last row of values */
        /* we need to make the type have extent of 1 char for scattering */
        for (int proc=0; proc<size; proc++) {
            int row, col;
            rowcol(proc, blocks, &row, &col);

            sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
            senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize);
        }
    }

    recvcounts = localsizes[1];
    MPI_Scatterv(globalptr, sendcounts, senddispls, vec,
                  &(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD);

    MPI_Type_free(&localvec);
    if (rank == 0)
        MPI_Type_free(&vec);

    /* now we need to do one more scatter, scattering just the last row of data 
     * just to the processors on the last row.
     * Here we recompute the sendcounts
     */
    if (rank == 0) {
        for (int proc=0; proc<size; proc++) {
            int row, col;
            rowcol(proc, blocks, &row, &col);
            sendcounts[proc] = 0;
            senddispls[proc] = 0;

            if ( isLastRow(row,blocks) ) {
                sendcounts[proc] = blocksize;
                senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize;
                if ( isLastCol(col,blocks) ) 
                    sendcounts[proc] += 1;
            }
        }
    }

    recvcounts = 0;
    if ( isLastRow(myrow, blocks) ) {
        recvcounts = blocksize;
        if ( isLastCol(mycol, blocks) )
            recvcounts++;
    }
    MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
                  &(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD);
}
/* third method: first distribute rows, then columns, each with a single scatterv */

void twophaseRowCol(const int myrow, const int mycol, const int rank, const int size, 
                    const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],
                    const char *const globalptr,  char **localdata) {
    char **rowdata ;

    /* create communicators which have processors with the same row or column in them*/
    MPI_Comm colComm, rowComm;
    MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm);
    MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm);

    /* first, scatter the array by rows, with the processor in column 0 corresponding to each row
     * receiving the data */
    if (mycol == 0) {
        int sendcounts[ blocks[0] ];
        int senddispls[ blocks[0] ];
        senddispls[0] = 0;

        for (int row=0; row<blocks[0]; row++) {
            /* each processor gets blocksize rows, each of size globalsizes[1]... */
            sendcounts[row] = blocksize*globalsizes[1];
            if (row > 0) 
                senddispls[row] = senddispls[row-1] + sendcounts[row-1];
        }
        /* the last processor gets one more */
        sendcounts[blocks[0]-1] += globalsizes[1];

        /* allocate my rowdata */
        rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );

        /* perform the scatter of rows */
        MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
                      &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);

    }

    /* Now, within each row of processors, we can scatter the columns.  
     * We can do this as we did in the previous example; create a vector
     * (and localvector) type and scatter accordingly */
    int locnrows = blocksize;
    if ( isLastRow(myrow, blocks) )
        locnrows++;

    MPI_Datatype vec, localvec;
    MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);
    MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
    MPI_Type_commit(&vec);

    MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);
    MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
    MPI_Type_commit(&localvec);

    int sendcounts[ blocks[1] ];
    int senddispls[ blocks[1] ];
    if (mycol == 0) {
        for (int col=0; col<blocks[1]; col++) {
            sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
            senddispls[col] = col*blocksize;
        }
    }
    char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL;

    MPI_Scatterv(rowptr, sendcounts, senddispls, vec,
                  &(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm);

    MPI_Type_free(&localvec);
    MPI_Type_free(&vec);

    if (mycol == 0) 
        freechar2darray(rowdata);

    MPI_Comm_free(&rowComm);
    MPI_Comm_free(&colComm);
}

int main(int argc, char **argv) {

    int rank, size;
    int blocks[2] = {0,0};
    const int blocksize=3;
    int globalsizes[2], localsizes[2];
    char **globaldata;
    char *globalptr = NULL;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (rank == 0 && argc < 2) {
        fprintf(stderr,"Usage: %s method
   Where method is one of: alltoall, twophasevecs, twophaserowcol
", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    /* calculate sizes for a 2d grid of processors */
    MPI_Dims_create(size, 2, blocks);

    int myrow, mycol;
    rowcol(rank, blocks, &myrow, &mycol);

    /* create array sizes so that last block has 1 too many rows/cols */
    globalsizes[0] = blocks[0]*blocksize+1;  
    globalsizes[1] = blocks[1]*blocksize+1;
    if (rank == 0) {
        globaldata = allocchar2darray(globalsizes[0], globalsizes[1]);
        globalptr = &(globaldata[0][0]);
        for (int i=0; i<globalsizes[0]; i++) 
            for (int j=0; j<globalsizes[1]; j++)
                globaldata[i][j] = 'a'+(i*globalsizes[1] + j)%26;

        printf("Global array: 
");
        printarray(globaldata, globalsizes[0], globalsizes[1]);
    }

    /* the local chunk we'll be receiving */
    localsizes[0] = blocksize; localsizes[1] = blocksize;
    if ( isLastRow(myrow,blocks)) localsizes[0]++;
    if ( isLastCol(mycol,blocks)) localsizes[1]++;
    char **localdata = allocchar2darray(localsizes[0],localsizes[1]);

    if (!strcasecmp(argv[1], "alltoall")) {
        if (rank == 0) printf("Method - alltoall
");
        alltoall(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata);
    } else if (!strcasecmp(argv[1],"twophasevecs")) {
        if (rank == 0) printf("Method - two phase, vectors, then cleanup
");
        twophasevecs(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata);
    } else {
        if (rank == 0) printf("Method - two phase - row, cols
");
        twophaseRowCol(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata);
    }

    for (int proc=0; proc<size; proc++) {
        if (proc == rank) {
            printf("
Rank %d:
", proc);
            printarray(localdata, localsizes[0], localsizes[1]);
        }
        MPI_Barrier(MPI_COMM_WORLD);            
    }

    freechar2darray(localdata);
    if (rank == 0) 
        freechar2darray(globaldata);

    MPI_Finalize();

    return 0;
}

char **allocchar2darray(int n, int m) {
    char **ptrs = malloc(n*sizeof(char *));
    ptrs[0] = malloc(n*m*sizeof(char));
    for (int i=0; i<n*m; i++)
        ptrs[0][i]='.';

    for (int i=1; i<n; i++) 
        ptrs[i] = ptrs[i-1] + m;

    return ptrs;
}

void freechar2darray(char **a) {
    free(a[0]);
    free(a);
}

void printarray(char **data, int n, int m) {
    for (int i=0; i<n; i++) {
        for (int j=0; j<m; j++) 
            putchar(data[i][j]);
        putchar('
');
    }
}

void rowcol(int rank, const int blocks[2], int *row, int *col) {
    *row = rank/blocks[1];
    *col = rank % blocks[1];
}

int isLastRow(int row, const int blocks[2]) {
    return (row == blocks[0]-1);
}

int isLastCol(int col, const int blocks[2]) {
    return (col == blocks[1]-1);
}

int typeIdx(int row, int col, const int blocks[2]) {
    int lastrow = (row == blocks[0]-1);
    int lastcol = (col == blocks[1]-1);

    return lastrow*2 + lastcol;
}

这篇关于使用 MPI 散布不同大小的矩阵块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆