MPI_Reduce选择前k个结果 [英] MPI_Reduce select first k results

查看:99
本文介绍了MPI_Reduce选择前k个结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用MPI在所有节点上找到前k个结果.为此,我想将MPI_Reduce与自己的函数一起使用.但是我的代码不起作用,因为该函数的len参数与给MPI_Reduce的count参数不同.
我在此处找到了实现可以这样做以流水线计算.

I want to find the first k results over all nodes using MPI. For that I wanted to use MPI_Reduce with an own function. However my code does not work because the len parameter of the function is not the same as the count parameter given to MPI_Reduce.
I found here that implementations may do this to pipeline the computation.

我的代码与此代码相似:

My code is similar to this one:

inline void MPI_user_select_top_k(int *invec, acctbal_pair *inoutvec, int *len, MPI_Datatype *dtpr) {
    std::vector<acctbal_pair> temp;
    for(int i = 0; i < *len; ++i) {
        acctbal_pair p1 = {invec[i].suppkey, invec[i].acctbal};
        acctbal_pair p2 = {inoutvec[i].suppkey, inoutvec[i].acctbal};
        temp.push_back(p1);
        temp.push_back(p2);
    }
    std::sort(temp.begin(), temp.end(), [&](acctbal_pair a, acctbal_pair b) { return a.acctbal > b.acctbal;});
    for(int i = 0; i < *len; ++i) {
        inoutvec[i].suppkey = temp[i].suppkey;
        inoutvec[i].acctbal = temp[i].acctbal;
    }
}

其中acctbal_pair是具有suppkey和acctbal字段的结构
我这样称呼MPI_Reduce.其中localResults和globalResults是大小为k的向量.

Where acctbal_pair is a struct with the fields suppkey and acctbal
I call MPI_Reduce like this. Where localResults and globalResults are vectors of size k.

MPI_Reduce(localResults.data(), globalResults.data(), k, mpi_suppkey_acctbal, select_top_k, ROOT, MPI_COMM_WORLD);

但是,对于稍大的k值,该计数会分成较小的块,从而使我的函数失败.

However for slightly larger values of k, the count gets splitted into smaller chunks, making my function fail.

有什么方法可以告诉Reduce不要对计算进行流水线处理吗?还是您知道另一种(有效的)实施方法?我真的不想使用MPI_Gather并因为大量的通信开销而在根上找到前k个结果.

Is there any way to tell the Reduce not to pipeline the computation? Or do you know Another (efficient) way to implement this? I really don't want to use a MPI_Gather and find the first k results on the root because of large communication overhead.

我不能只创建具有固定参数k的函数(并将所有k个元素都视为1 MPI_type),因为k是在运行时计算的.

I can not just create the function with a fixed parameter k (and treat all k elements it as 1 MPI_type) as k is computed at runtime.

我知道这不是MPI_Reduce的目的(它应该只按元素计算一些操作),但是如果不对count进行分块的话,这将非常有效.

I know that this is not the purpose of MPI_Reduce (which should just compute some operation element-wise) but this works perfectly if count is not chunked.

p.S .:我的MPI实现是OpenMPI

p.S.: My MPI Implementation is OpenMPI

推荐答案

当然,您可以执行此操作-您只需要创建一个大小为k的类型(在运行时就很容易做到)并进行选择即可.唯一的窍门是,您没有办法将状态(例如k)传递给选择操作,因此您需要通过全局变量进行通信-这显然不是很好,但可以做一个需要做的事情.如果您需要以k的不同大小重复运行该算法,则只需根据需要创建该类型并重置全局变量.

Sure, you can do this - you just need to create a type of size k (which is easy enough to do at runtime) and do the selection. The only trick is, you don't have a way to pass state (eg, k) to the selection operation, so you need to communicate via a global variable - which is obviously not great, but one does what one needs to do. If you need to run the algorithm repeatedly with different sizes of k, one just creates the type as needed and resets the global variable.

(如果将k的值以其他方式潜入选择操作,则可以避免使用全局变量-例如,每个数组中传递给它的数据的第一个元素是值k.)

(You can avoid the global variable if you sneak the value of k into the selection operation some other way - say, the first element of data passed to it in each array is the value k.)

下面是一些执行此操作的代码;它允许处理器的值小于k的情况.每个处理器选择其k个最小值并将其填充到本地数组中,然后选择操作执行部分排序合并操作以仅提取k个最小元素.

Below is some code that does this; it allows for the case where a processor has less than k values. Each processor selects its k minimum values and stuffs them in the local array, and then the selection operation does a partial sorted-merge operation to pick off just the k least elements.

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <limits.h>
#include <math.h>

const int invalid_data = -1;
static int globalselectk;        /* yuk */

int min2(int a, int b) {
    if (b < a)
        return b;
    return a;
}

MPI_Datatype createtype(int selectk) {

    MPI_Datatype selectktype;
    MPI_Type_contiguous(selectk, MPI_INT, &selectktype);
    MPI_Type_commit(&selectktype);

    return selectktype;
}

void initselectk(int *d, size_t selectk) {
    for (int i=0; i<selectk; i++)
        d[i] = invalid_data;
}

void printselectk(int *d, size_t selectk) {
    printf("[");
    for (int i=0; i<selectk; i++)
        printf("%3d ",d[i]);
    printf("] ");
}

int countselectk(int *d, size_t selectk) {
    int count = 0;

    while ( (d[count] != invalid_data) && (count < selectk) )
        count++;

    return count;
}
int mergeselect(int *d1, int *d2, int *dout, size_t selectk) {
    int count1 = countselectk(d1, selectk);
    int count2 = countselectk(d2, selectk);

    int count = 0;
    int total = count1+count2;
    if (total >= selectk) total = selectk;

    int idx1=0, idx2=0;

    while (count < total) {
        int minloc = -1;
        int minval = INT_MAX;
        if (idx1 < count1) {
            minloc = 1;
            minval = d1[idx1];
        }
        if ( (idx2 < count2) && (d2[idx2] < minval ) ) {
            minloc = 2;
            minval = d2[idx2];
        }
        dout[count++] = minval;
        if (minloc == 1)
            idx1++;
        else
            idx2++;
    }
    return count;
}

void selectop(void *in, void *inout, int *len, MPI_Datatype *type) {
    int *invals = (int *)in;
    int *inoutvals = (int *)inout;

    int out[globalselectk];

    for (int i=0; i<*len; i++) {
        initselectk(out, globalselectk);
        int count = mergeselect(invals, inoutvals, out, globalselectk);

        for (int j=0; j<count; j++)
            inoutvals[j] = out[j];

        invals += globalselectk;
        inoutvals += globalselectk;
    }

    return;
}

int intcompar(const void *v1, const void *v2) {
    int *i1 = (int *)v1;
    int *i2 = (int *)v2;

    return (*i1 - *i2);
}

int main(int argc, char **argv) {

    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (argc < 3) {
        fprintf(stderr,"Usage: %s localn k-to-select\n", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    int locn    = atoi(argv[1]);
    int selectk = atoi(argv[2]);
    globalselectk = selectk;     /* yuk */

    int localdata[locn];
    int local[selectk], global[selectk];

    /* create our new data type */
    MPI_Datatype mpi_datatype = createtype(selectk);

    MPI_Op mpi_selectop;
    MPI_Op_create(selectop, 1, &mpi_selectop);

    srand(rank*37);
    for (int i=0; i<locn; i++)
        localdata[i] = floor(500.*rand()/RAND_MAX);
    /* get our local k selected */
    /* could use quickselect for this, but to focus on the MPI, let's just sort */

    initselectk(local, selectk);
    qsort(localdata, locn, sizeof(int), intcompar);
    for (int i=0; i<min2(selectk,locn); i++)
        local[i] = localdata[i];

    for (int proc=0; proc<size; proc++) {
        if (rank == proc) {
            printf("Rank %2d has values: ",rank);
            for (int i=0; i<locn; i++)
                printf("%3d ", localdata[i]);
            printf("\n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    MPI_Reduce(local, global, 1, mpi_datatype, mpi_selectop, 0, MPI_COMM_WORLD);

    if (rank == 0) {
        printf("Result is: \n");
        printselectk(global,selectk);
        printf("\n");
    }

    MPI_Op_free(&mpi_selectop);
    MPI_Type_free(&mpi_datatype);
    MPI_Finalize();

    return 0;
}

编译和运行可以得到:

$ mpicc kselect.c -o kselect -Wall -std=c99 
$ mpirun -np 10 kselect 12 5
Rank  0 has values:  98 138 167 197 238 276 314 384 391 399 420 455 
Rank  1 has values:  16  87 119 134 156 164 225 299 321 380 409 441 
Rank  2 has values:  22  81 155 219 285 295 330 342 364 399 435 499 
Rank  3 has values:   3   7  75 164 181 271 285 358 379 438 466 491 
Rank  4 has values:   7  63  74 132 173 178 197 244 304 337 352 457 
Rank  5 has values:  21  62 104 138 240 346 377 382 411 446 455 482 
Rank  6 has values:  19  90 142 231 246 269 281 307 331 380 413 451 
Rank  7 has values:  43 191 193 232 236 331 399 429 439 445 446 457 
Rank  8 has values:  10 111 128 165 277 277 371 394 413 438 443 470 
Rank  9 has values:   2   2  34  57  97 105 128 187 265 329 344 409 
Result is: 
[  2   2   3   7   7 ] 

(不带全局变量的版本如下:)

(Version without global variable follows:)

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <limits.h>
#include <math.h>
#include <assert.h>

const int invalid_data = -1;

int min2(int a, int b) {
    if (b < a)
        return b;
    return a;
}

MPI_Datatype createtype(int selectk) {
    MPI_Datatype selectktype;
    MPI_Type_contiguous(selectk, MPI_INT, &selectktype);
    MPI_Type_commit(&selectktype);

    return selectktype;
}

void initselectk(int *d, int selectk) {
    d[0] = selectk;
    for (int i=1; i<selectk+1; i++)
        d[i] = invalid_data;
}

void printselectk(int *d) {
    int selectk = d[0];
    printf("[");
    for (int i=1; i<selectk+1; i++) 
        printf("%3d ",d[i]);
    printf("] ");
}

int countselectk(int *d) {
    int selectk = d[0];
    int count = 0;
    d++;

    while ( (d[count] != invalid_data) && (count < selectk) )
        count++;

    return count;
}

int mergeselect(int *d1, int *d2, int *dout) {
    int selectk = d1[0];
    assert(selectk == d2[0]);
    dout[0] = selectk;
    dout++;

    int count1 = countselectk(d1);
    int count2 = countselectk(d2);
    int total = count1 + count2;
    if (total >= selectk) total = selectk;

    int count = 0;
    int idx1=1, idx2=1;

    while (count < total) {
        int minloc = -1;
        int minval = INT_MAX;
        if (idx1 <= count1) {
            minloc = 1;
            minval = d1[idx1];
        }
        if ( (idx2 <= count2) && (d2[idx2] < minval ) ) {
            minloc = 2;
            minval = d2[idx2];
        } 
        dout[count++] = minval;
        if (minloc == 1)
            idx1++;
        else
            idx2++; 
    }
    return count;
}

void selectop(void *in, void *inout, int *len, MPI_Datatype *type) {
    int *invals = (int *)in;
    int *inoutvals = (int *)inout;


    for (int i=0; i<*len; i++) {
        int selectk = invals[0];
        assert(selectk == inoutvals[0]);

        int out[selectk+1];
        initselectk(out, selectk);
        int count = mergeselect(invals, inoutvals, out);
        for (int j=1; j<=count; j++) 
            inoutvals[j] = out[j];

        invals += selectk+1;
        inoutvals += selectk+1;
    }

    return;
}

int intcompar(const void *v1, const void *v2) {
    int *i1 = (int *)v1;
    int *i2 = (int *)v2;

    return (*i1 - *i2);
}

int main(int argc, char **argv) {

    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (argc < 3) {
        fprintf(stderr,"Usage: %s localn k-to-select\n", argv[0]);
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    int locn    = atoi(argv[1]);
    int selectk = atoi(argv[2]);

    int localdata[locn];
    int local[selectk+1], global[selectk+1];

    /* create our new data type */
    MPI_Datatype mpi_datatype = createtype(selectk+1);

    MPI_Op mpi_selectop;
    MPI_Op_create(selectop, 1, &mpi_selectop);

    srand(rank*37);
    for (int i=0; i<locn; i++) 
        localdata[i] = floor(500.*rand()/RAND_MAX);

    /* get our local k selected */
    /* could use quickselect for this, but to focus on the MPI, let's just sort */

    initselectk(local, selectk);
    qsort(localdata, locn, sizeof(int), intcompar);
    for (int i=0; i<min2(selectk,locn); i++) 
        local[i+1] = localdata[i];

    for (int proc=0; proc<size; proc++) {
        if (rank == proc) {
            printf("Rank %2d has values: ",rank);
            for (int i=0; i<locn; i++)
                printf("%3d ", localdata[i]);
            printf("\n");
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    MPI_Reduce(local, global, 1, mpi_datatype, mpi_selectop, 0, MPI_COMM_WORLD);

    if (rank == 0) {
        printf("Result is: \n");
        printselectk(global);
        printf("\n");
    }

    MPI_Op_free(&mpi_selectop);
    MPI_Type_free(&mpi_datatype);
    MPI_Finalize();

    return 0;
}

这篇关于MPI_Reduce选择前k个结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆