MPI异步/单向通讯 [英] MPI Asynchronous/One-Sided Communication

查看:132
本文介绍了MPI异步/单向通讯的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有类似code以下的情况:工作进程上的数据的一个子集工作,必须发送数据的未知量回主。是否有可能有主等待和收到未知的数量从工作进程传送的?有没有办法做到这一点用片面的沟通?在此先感谢!

 的#include<&errno.h中GT;
#包括LT&;&mpi.h GT;
#包括LT&;&stdio.h中GT;
#包括LT&;&stdlib.h中GT;
#包括LT&;&time.h中GT;/ *
    样品运行/输出:
    $的mpirun -np 5 practice.exe
    @ [1]:I = 30
    @ [2]中:i = 0
    @ [2]:I = 75
    @ [4]:I = 40
    @ [4]:I = 55
    @ [3]:I = 85
    @ [3]:I = 65
* /
INT主(INT ARGC,CHAR *的argv [])
{
    INT I,等级,规格,NP,NW,NUM;    MPI_INIT(安培; ARGC,&安培; argv的);
    MPI_Comm_rank(MPI_COMM_WORLD,&安培;等级);
    MPI_Comm_size(MPI_COMM_WORLD,&安培; NP);
    NW = NP-1;    函数srand(时间(NULL)*等级);    如果(等级大于0)
    {
        对于(I =(等级-1);我≤(NW * 10); I + = NW)
        {
            NUM =兰特()%100;
            如果(NUM%5 == 0)
            {
                的printf(@ [%D]。:I =%d个\\ N,军衔,NUM);
                //发送NUM掌握
            }
        }
    }
    其他
    {
        //接收NUM个来自WORKER
    }    MPI_Finalize();    返回EXIT_SUCCESS;
}


解决方案

当然,还有很多方法可以做到这一点,但它并没有真正有什么用异步通信。你可以用单面通信做到这一点,但即使有它自己的问题,这(你还必须能够猜出多少总内存如何将所需数据)。

做到这一点的方法之一是简单地计算出有多少数据你有,发送提前到主,所以它知道有多少邮件接收,然后在同一时间发送数据的一个:

 的#include<&errno.h中GT;
#包括LT&;&mpi.h GT;
#包括LT&;&stdio.h中GT;
#包括LT&;&stdlib.h中GT;#定义MAXPERWORKER 10
#定义TAG_NUM_INCOMING 1
#定义TAG_DATA 2
INT主(INT ARGC,CHAR *的argv [])
{
    INT I,等级,规格,NP,NW,NUM;
    诠释mynums [MAXPERWORKER],numcount,总;    MPI_INIT(安培; ARGC,&安培; argv的);
    MPI_Comm_rank(MPI_COMM_WORLD,&安培;等级);
    MPI_Comm_size(MPI_COMM_WORLD,&安培; NP);
    NW = NP-1;    函数srand(时间(NULL)*等级);    如果(等级大于0)
    {
        numcount = 0;
        总= 0;
        对于(I =(等级-1);我≤(NW * 10); I + = NW)
        {
            NUM =兰特()%100;
            如果(NUM%3 == 0)
            {
                的printf(@ [%D]。:I =%d个\\ N,军衔,NUM);
                mynums [numcount] = NUM​​;
                numcount ++;
                总+ = NUM​​;
            }        }
        / *当然,在这种情况下,我们可以只
         *做到这一点的一个消息,但..
         * /
        MPI_SEND(安培; numcount,1,MPI_INT,0,TAG_NUM_INCOMING,MPI_COMM_WORLD);
        对于(i = 0; I< numcount;我++)
            MPI_SEND(及(mynums [I]),1,MPI_INT,0,TAG_DATA,MPI_COMM_WORLD);        的printf(@ [%D]。:所有NUMS总数为%d \\ n,排名,计);
    }
    其他
    {
        为int *总数=的malloc(sizeof的(INT)*净重);
        为int *数=的malloc(sizeof的(INT)*净重);
        为int * SOFAR =的malloc(sizeof的(INT)*净重);
        INT **数据=的malloc(sizeof的为(int *)*净重);
        INT RCV;
        INT totalcounts;
        诠释J;
        INT workernum;
        MPI_Status状态;        对于(i = 0; I< NW;我++){
            声发射由[i] = 0;
            总计[I] = 0;
        }        / *获得传入邮件的数量* /
        对于(i = 0; I< NW;我++){
            MPI_RECV(安培; RCV,1,MPI_INT,MPI_ANY_SOURCE,TAG_NUM_INCOMING,MPI_COMM_WORLD,&安培;状态);            workernum = status.MPI_SOURCE-1;
            计数[workernum] = RCV;
            totalcounts + = RCV;
            数据[workernum] =的malloc(sizeof的(INT)* RCV);
        }        / *获取真实的数据* /
        对于(i = 0; I< totalcounts;我++){
            MPI_RECV(安培; RCV,1,MPI_INT,MPI_ANY_SOURCE,TAG_DATA,MPI_COMM_WORLD,&安培;状态);
            workernum = status.MPI_SOURCE-1;
            数据[workernum] [SOFAR [workernum] ++] = RCV;
            总计[workernum] + = RCV;
        }        / *打印结果* /
        对于(i = 0; I< NW;我++){
            的printf(从[%2d]的:我+ 1);
            为(J = 0; J<计数[I]; J ++)
                的printf(%3D,数据[I] [J]);
            的printf(|%3D \\ n,总计[I]);
        }        对于(i = 0; I< NW;我++)
            免费(数据由[i]);
        免费的(数据);
        免费(总计);
        免费(计数);
        免费(SOFAR);
    }    MPI_Finalize();    返回EXIT_SUCCESS;
}

这4个进程运行,我得到:

  $的mpirun -np 4 ./masterworker1@ [1]:I = 39
@ [1]:I = 81
@ [3]中:i = 9
@ [3]:I = 45
@ [3]中:i = 0
@ [3]:I = 57
@ [3]:所有NUMS总为111
@ [1]:所有NUMS总为120
从[1]:39 81 | 120
从[2]:24 6 39 | 69
从[3]:9 45 0 57 | 111
@ [2]:I = 24
@ [2]中:i = 6
@ [2]:I = 39
@ [2]:所有NUMS总为69

然而,这可能是行不通的 - 你可能不希望缓冲所有的数据是这样的(如果你能,你可以只发送一个消息)

另一种方法是发送数据,然后就大功告成了发送数据时,发送一个特殊的消息,而主只是不断recieving直到它听到了每个工人的这些完成消息之一:

 的#include<&errno.h中GT;
#包括LT&;&mpi.h GT;
#包括LT&;&stdio.h中GT;
#包括LT&;&stdlib.h中GT;#定义MAXPERWORKER 10
#定义TAG_DATA 2
#定义TAG_DONE 1
INT主(INT ARGC,CHAR *的argv [])
{
    INT I,等级,规格,NP,NW,NUM;
    诠释mynums [MAXPERWORKER],numcount,总;    MPI_INIT(安培; ARGC,&安培; argv的);
    MPI_Comm_rank(MPI_COMM_WORLD,&安培;等级);
    MPI_Comm_size(MPI_COMM_WORLD,&安培; NP);
    NW = NP-1;    函数srand(时间(NULL)*等级);    如果(等级大于0)
    {
        numcount = 0;
        总= 0;
        对于(I =(等级-1);我≤(NW * 10); I + = NW)
        {
            NUM =兰特()%100;
            如果(NUM%3 == 0)
            {
                的printf(@ [%D]。:I =%d个\\ N,军衔,NUM);
                总+ = NUM​​;
                MPI_SEND(试验#1,MPI_INT,0,TAG_DATA,MPI_COMM_WORLD);
            }        }
        MPI_SEND(试验#1,MPI_INT,0,TAG_DONE,MPI_COMM_WORLD);        的printf(@ [%D]。:所有NUMS总数为%d \\ n,排名,计);
    }
    其他
    {
        为int *总数=的malloc(sizeof的(INT)*净重);
        为int *数=的malloc(sizeof的(INT)*净重);
        INT **数据=的malloc(sizeof的为(int *)*净重);
        INT RCV;
        诠释J;
        INT workernum;
        诠释stillsending;
        MPI_Status状态;        对于(i = 0; I< NW;我++){
            总计[I] = 0;
            计数[I] = 0;
            数据[I] =的malloc(sizeof的(INT)* MAXPERWORKER);
        }
        stillsending = NW;        / *获取的数据* /
        而(stillsending大于0){
            MPI_RECV(安培; RCV,1,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&安培;状态);            workernum = status.MPI_SOURCE-1;
            如果(status.MPI_TAG == TAG_DONE){
                stillsending--;
            }否则如果(status.MPI_TAG == TAG_DATA){
                数据[workernum] [计数[workernum] = RCV;
                总计[workernum] + = RCV;
                计数[workernum] ++;
            }
        }        / *打印结果* /
        对于(i = 0; I< NW;我++){
            的printf(从[%2d]的:我+ 1);
            为(J = 0; J<计数[I]; J ++)
                的printf(%3D,数据[I] [J]);
            的printf(|%3D \\ n,总计[I]);
        }        对于(i = 0; I< NW;我++)
            免费(数据由[i]);
        免费的(数据);
        免费(总计);
        免费(计数);
    }    MPI_Finalize();    返回EXIT_SUCCESS;
}

再上4个任务,我得到:

  $的mpirun -np 4 ./masterworker2@ [1]:I = 63
@ [1]:I = 99
@ [1]:I = 60
@ [1]:I = 69
@ [1]:I = 21
@ [1]:I = 48
@ [1]:I = 24
@ [1]:所有NUMS总为384
@ [2]:I = 39
@ [2]:I = 84
@ [2]:I = 63
@ [2]:所有NUMS总为186
@ [3]中:i = 3
@ [3]:I = 51
@ [3]:I = 36
@ [3]:所有NUMS总为90
从[1]:63 99 60 69 21 48 24 | 384
从[2]:39 84 63 | 186
从[3]:3 51 36 | 90

请注意在这两种情况下,我已经在一些MAXPERWORKER大小的数组至preallocate东西依赖;你并不真的需要这虽然,你可以MALLOC一个数组,并在必要的realloc,或使用一个std ::向量的事情,如果你愿意使用C ++。

I have a situation similar to the code below: worker processes work on a subset of data and must send an unknown amount of data back to the master. Is it possible to have the master wait and receive an unknown number of sends from the worker processes? Is there a way to do it using one-sided communication? Thanks in advance!

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

/*
    sample run/output:
    $mpirun -np 5 practice.exe
    @[1]: i=30
    @[2]: i=0
    @[2]: i=75
    @[4]: i=40
    @[4]: i=55
    @[3]: i=85
    @[3]: i=65
*/
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 5 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                // SEND num TO MASTER
            }
        }
    }
    else
    {
        // RECEIVE num FROM WORKER
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

解决方案

Sure, there's lots of ways to do this, but it doesn't really have anything to do with asynchronous communications. You can do it with 1-sided communications, but even that has its own problems with this (you still have to be able to guess how much total memory will be needed for the data).

One way to do it is simply to figure out how much data you have, send that ahead to the master so it knows how many messages to receive, and then send your data one at a time:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_NUM_INCOMING 1
#define TAG_DATA 2
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;
    int mynums[MAXPERWORKER], numcount, total;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        numcount = 0;
        total    = 0;
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 3 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                mynums[numcount] = num;
                numcount++;
                total += num;
            }

        }
        /* of course, in this case we could just
         * do this in one message, but..
         */
        MPI_Send(&numcount, 1, MPI_INT, 0, TAG_NUM_INCOMING, MPI_COMM_WORLD);
        for (i=0; i<numcount; i++)
            MPI_Send(&(mynums[i]), 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);

        printf("@[%d]: Total of all nums is %d\n", rank, total);
    }
    else
    {
        int *totals = malloc(sizeof(int)*nw);
        int *counts = malloc(sizeof(int)*nw);
        int *sofar  = malloc(sizeof(int)*nw);
        int **data = malloc(sizeof(int *)*nw);
        int rcv;
        int totalcounts;
        int j;
        int workernum;
        MPI_Status status;

        for (i=0; i<nw; i++) {
            sofar[i] = 0;
            totals[i]= 0;
        }

        /* get number of incoming messages */
        for (i=0; i<nw; i++) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_NUM_INCOMING, MPI_COMM_WORLD, &status);

            workernum = status.MPI_SOURCE-1;
            counts[workernum] = rcv;
            totalcounts += rcv;
            data[workernum] = malloc(sizeof(int)*rcv);
        }

        /* get real data */
        for (i=0; i<totalcounts; i++) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, TAG_DATA, MPI_COMM_WORLD, &status);
            workernum = status.MPI_SOURCE-1;
            data[ workernum ][ sofar[workernum]++ ] = rcv;
            totals[ workernum ] += rcv;
        }

        /* print results */
        for (i=0; i<nw; i++) {
            printf("From [%2d]:", i+1);
            for (j=0; j<counts[i]; j++)
                printf("%3d ", data[i][j]);
            printf("| %3d\n", totals[i]);
        }

        for (i=0; i<nw; i++)
            free(data[i]);
        free(data);
        free(totals);
        free(counts);
        free(sofar);
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

Running this on 4 processes, I get:

$ mpirun -np 4 ./masterworker1

@[1]: i=39
@[1]: i=81
@[3]: i=9
@[3]: i=45
@[3]: i=0
@[3]: i=57
@[3]: Total of all nums is 111
@[1]: Total of all nums is 120
From [ 1]: 39  81 | 120
From [ 2]: 24   6  39 |  69
From [ 3]:  9  45   0  57 | 111
@[2]: i=24
@[2]: i=6
@[2]: i=39
@[2]: Total of all nums is 69

However, this might not be feasible -- you might not want to buffer all your data like this (and if you could, you could just send it in one message).

Another approach is to send data, and then send a special message when you're done sending data, and the master just keeps recieving until it's heard one of these "Done" messages from each worker:

#include <errno.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define MAXPERWORKER 10
#define TAG_DATA 2
#define TAG_DONE 1
int main(int argc, char *argv[])
{
    int i, rank, size, np, nw, num;
    int mynums[MAXPERWORKER], numcount, total;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    nw = np -1;

    srand(time(NULL)*rank);

    if (rank > 0)
    {
        numcount = 0;
        total    = 0;
        for (i=(rank-1); i<(nw*10); i+=nw)
        {
            num = rand() % 100;
            if (num % 3 == 0)
            {
                printf("@[%d]: i=%d\n", rank, num);
                total += num;
                MPI_Send(&num, 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD);
            }

        }
        MPI_Send(&num, 1, MPI_INT, 0, TAG_DONE, MPI_COMM_WORLD);

        printf("@[%d]: Total of all nums is %d\n", rank, total);
    }
    else
    {
        int *totals = malloc(sizeof(int)*nw);
        int *counts = malloc(sizeof(int)*nw);
        int **data = malloc(sizeof(int *)*nw);
        int rcv;
        int j;
        int workernum;
        int stillsending;
        MPI_Status status;

        for (i=0; i<nw; i++) {
            totals[i]= 0;
            counts[i]= 0;
            data[i] = malloc(sizeof(int)*MAXPERWORKER);
        }
        stillsending = nw;

        /* get data */
        while (stillsending > 0) {
            MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

            workernum = status.MPI_SOURCE-1;
            if (status.MPI_TAG == TAG_DONE) {
                stillsending--;
            } else if (status.MPI_TAG == TAG_DATA) {
                data[workernum][counts[workernum]] = rcv;
                totals[workernum] += rcv;
                counts[workernum]++;
            }
        }

        /* print results */
        for (i=0; i<nw; i++) {
            printf("From [%2d]:", i+1);
            for (j=0; j<counts[i]; j++)
                printf("%3d ", data[i][j]);
            printf("| %3d\n", totals[i]);
        }

        for (i=0; i<nw; i++)
            free(data[i]);
        free(data);
        free(totals);
        free(counts);
    }

    MPI_Finalize();

    return EXIT_SUCCESS;
}

Again on 4 tasks, I get:

$ mpirun -np 4 ./masterworker2

@[1]: i=63
@[1]: i=99
@[1]: i=60
@[1]: i=69
@[1]: i=21
@[1]: i=48
@[1]: i=24
@[1]: Total of all nums is 384
@[2]: i=39
@[2]: i=84
@[2]: i=63
@[2]: Total of all nums is 186
@[3]: i=3
@[3]: i=51
@[3]: i=36
@[3]: Total of all nums is 90
From [ 1]: 63  99  60  69  21  48  24 | 384
From [ 2]: 39  84  63 | 186
From [ 3]:  3  51  36 |  90

Note in both of these cases I've relied on some MAXPERWORKER size array to preallocate things; you don't really need this though, you could malloc an array and realloc as necessary, or use a std::vector thing if you're willing to use C++.

这篇关于MPI异步/单向通讯的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆