使用MPI进行并行排序 [英] parallel sort using mpi

查看:117
本文介绍了使用MPI进行并行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用mpi对不同的数组进行排序.每个数组都在本地分配. 例如我们有{1-7-4-12} {3-7-5-9} {12-15-2-16} {10-8-11-13} 我们想要{1-2-3-4}{5-6-7-8}{9-10-11-12}{13-14-15-16}

I try to sort different array with mpi. Every array are allocate locally. for example we have {1-7-4-12} {3-7-5-9} {12-15-2-16} {10-8-11-13} and we want {1-2-3-4}{5-6-7-8}{9-10-11-12}{13-14-15-16}

所以我使用奇偶策略.对于2proccess来说,它在每种情况下都有效,但是当我尝试更多的过程时,我就会拥有新的价值.对于我的示例,我可以使用{23-2-3-4}.,我认为我的问题出在分配内存上,但是我找不到我做错的地方和事情...

So I use odd-even strategy. For 2proccess it's works in every case but when i try with more process i have new value. For my example i can have {23-2-3-4}. I think my problem is from allocate memory but i don't find where and what i do wrong...

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define MASTER 0
#define MIN(a,b) ((a)<(b)?(a):(b))

#define BLOCK_LOW(id,p,n)  ((id)*(n)/(p))

#define BLOCK_HIGH(id,p,n) \
        (BLOCK_LOW((id)+1,p,n)-1)

#define BLOCK_SIZE(id,p,n) \
        (BLOCK_LOW((id)+1, p, n)-BLOCK_LOW(id, p , n))

#define BLOCK_OWNER(index,p,n) \
        (((p)*(index+1)-1)/(n))

int nbProcess, id, n; //n = number of value

void printTabByProcess(int *T){
    int i = 0;
    int size = BLOCK_SIZE(id, nbProcess, n);
    printf("Tab n°%d [ ", id, size);
    for(i; i < size; i++){
        printf(" %d ", T[i]);
    }
    printf(" ]\n");
}

void fusion(int *t,int deb1,int fin1,int fin2){ 
    int *table1; 
    int deb2=fin1+1; 
    int compt1=deb1; 
    int compt2=deb2; 
    int i;
    table1=(int*)malloc((fin1-deb1+1)*sizeof(int)); 

    for(i=deb1;i<=fin1;i++) {
        table1[i-deb1]=t[i]; 
    }

    for(i=deb1;i<=fin2;i++){
        if(compt1==deb2)
            break;
        else if(compt2==(fin2+1)){
            t[i]=table1[compt1-deb1];
            compt1++;
        } 
        else if(table1[compt1-deb1]<t[compt2]){
            t[i]=table1[compt1-deb1];
            compt1++;
        } 
        else{
            t[i]=t[compt2];
            compt2++;
        }
    }
    free(table1);
}


void tri_fusion(int*t,int deb,int fin){
    if(deb!=fin){ 
        int milieu=(fin+deb)/2; 
        tri_fusion(t,deb,milieu);
        tri_fusion(t,milieu+1,fin);
        fusion(t,deb,milieu,fin);
    }
}

int* fusion2(int* t1, int* t2, int size1, int size2){
    int* buffer = malloc(sizeof(int)*(size1 + size2));
    int index1 = 0;
    int index2 = 0;
    int i = 0;
    for(i; i < (size1 + size2) - 1; i++){
        if(t1[index1] < t2[index2]){
            buffer[i] = t1[index1];
            index1++;
        }else{
            buffer[i] = t2[index2];
            index2++;
        }
    }
    if(index1 == size1 - 1 ){
        buffer[size1 + size2 - 1] = t1[index1];
    }else{
        buffer[size1 + size2 - 1] = t2[index2];
    }

    return buffer;
}
/*
*
* OUR FUNCTION TO PARALLEL SORT
*
*/
void TD_trier(int* T){
    MPI_Status status;
    int size = BLOCK_SIZE(id, nbProcess, n);
    int receive_size = 0;
    int* receive;
    int* array_tmp;
    int i = 0;
    tri_fusion(T, 0, size - 1);
    MPI_Barrier(MPI_COMM_WORLD);
    for(i; i < nbProcess; i++){
        if(i%2==0){
            if(id % 2 == 1){//send to left
                MPI_Send(&size, 1, MPI_INT, id - 1, 1, MPI_COMM_WORLD);
                MPI_Send(T, size, MPI_INT, id - 1, 1, MPI_COMM_WORLD);
                MPI_Recv(T, size, MPI_INT, id - 1, 1, MPI_COMM_WORLD, &status);
            }else {
                MPI_Recv(&receive_size, 1, MPI_INT, id + 1, 1, MPI_COMM_WORLD, &status);
                receive = malloc(sizeof(int) * size);
                MPI_Recv(receive, receive_size, MPI_INT, id + 1, 1, MPI_COMM_WORLD, &status);
                array_tmp = fusion2(T, receive, size, receive_size);
                MPI_Send(&array_tmp[size], receive_size, MPI_INT, id + 1, 1, MPI_COMM_WORLD);
                T = realloc(array_tmp, sizeof(int) * size);
            }
            if(id == 1){
                //~ printTabByProcess(T);
            }   
        }else if(i%2 == 1 && id < nbProcess-1){ //send to right
            if(id % 2 == 1){
                MPI_Send(&size, 1, MPI_INT, id + 1, 1, MPI_COMM_WORLD);
                MPI_Send(T, size, MPI_INT, id + 1, 1, MPI_COMM_WORLD);
                //printTabByProcess(T);
                MPI_Recv(T, size, MPI_INT, id + 1, 1, MPI_COMM_WORLD, &status);
            }else if(id != 0 && id%2 ==0) {
                MPI_Recv(&receive_size, 1, MPI_INT, id - 1, 1, MPI_COMM_WORLD, &status);
                //receive = malloc(sizeof(int) * size);
                MPI_Recv(receive, receive_size, MPI_INT, id - 1, 1, MPI_COMM_WORLD, &status);
                //printTabByProcess(receive);
                array_tmp = fusion2(T, receive, size, receive_size);
                MPI_Send(array_tmp, receive_size, MPI_INT, id - 1, 1, MPI_COMM_WORLD);
                printTabByProcess(&array_tmp[2]);
                T = array_tmp + size;
                printTabByProcess(T);
            }   
        }
        MPI_Barrier(MPI_COMM_WORLD);    
    }
    //printTabByProcess(T);

}

int generateRandomValue(){
    return rand() % 100; 
}
//init array with "random" value
int* TD_init(int n){
        int i = 0;
        int indiceDerniere = (id+1)*n/nbProcess -1;
        int indicePremiere = id*n/nbProcess;
        int* arrayLocal;
        int localSize = indiceDerniere - indicePremiere +1;
        arrayLocal = malloc(sizeof(int)*localSize);

        //~ printf("id : %d - nbCase : %d (debut : %d, fin : %d)\n", 
            //~ id, localSize, indicePremiere, indiceDerniere);

        for(i; i < localSize; i++){
            arrayLocal[i] = generateRandomValue() - id;
        }

        printTabByProcess(arrayLocal);

        return arrayLocal;
}

int main (int argc, char *argv[]){ 
    //int n = 0;
    int *dataLocal;
    int dest;
    int x;
    int success;

    MPI_Status status;

    srand(time(NULL));

    /***** Initializations *****/
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &nbProcess); //numtask contient le nombre de processeur
    MPI_Comm_rank(MPI_COMM_WORLD, &id); //taskid, determine le numero du processus
    //~ printf ("MPI task %d has started...\n", id);
    //~ tag2 = 1;
    //~ tag1 = 2;

    MPI_Barrier (MPI_COMM_WORLD);

    /***** Master task only ******/
    if (id == MASTER){

        printf("Chose a number of value :");
        scanf("%d",&n);

        /* Send the number of cases */

        for (dest=1; dest<nbProcess; dest++) {
            MPI_Send(&n, 1, MPI_INT, dest, 1, MPI_COMM_WORLD); //send number of value
        }
    }  /* end of master section */

    /***** Non-master tasks only *****/
    if (id > MASTER) {
        /* Receive the number of cases */
        MPI_Recv(&n, 1, MPI_INT, MASTER, 1, MPI_COMM_WORLD, &status);   
    }
    MPI_Barrier (MPI_COMM_WORLD);

    dataLocal = TD_init(n);
    MPI_Barrier (MPI_COMM_WORLD);
    if(id == 0){
        printf("__________________________________________\n");
    }

    TD_trier(dataLocal);

    MPI_Finalize();
}

推荐答案

问题可能来自fusion2函数. index1可以变得大于size1.实际上,MPI部分可以正常工作.一旦执行了测试,代码就可以工作.这不是一个最佳版本,但是...

Troubles may come from fusion2 function. index1 can become higher than size1. In fact, the MPI part works correctly. The code works once tests are performed. Here is a version that is not optimal but...

int* fusion2(int* t1, int* t2, int size1, int size2){
  int* buffer = malloc(sizeof(int)*(size1 + size2));
  int index1 = 0;
  int index2 = 0;
  int i = 0;
  for(i; i < (size1 + size2) ; i++){
    if(index1==size1){
        buffer[i] = t2[index2];
        index2++;
    }else{
      if(index2==size2){
        buffer[i] = t1[index1];
            index1++;
      }else{
         if(t1[index1] < t2[index2]){
                 buffer[i] = t1[index1];
                    index1++;
            }else{
                    buffer[i] = t2[index2];
                    index2++;
            }
           }
    }


  }

  return buffer;
}

注意内存管理. 例如:您在做T作业前有空吗?

Watch for memory management. Ex : did you free T before doing ?

T = realloc(array_tmp, sizeof(int) * size);

您有空接收"吗?您在第二部分中释放了"array_tmp"吗? 我担心存在内存泄漏...最好避免在Fusion2中甚至在循环中进行分配.分配array_tmp并在开始时使用"enougth"空间接收可能会更安全(更快?).

Did you free "receive" ? did you free "array_tmp" in the second part ? I fear memory leakages exist... It might be better to avoid allocation in fusion2, and even in the loops. Allocate array_tmp and receive at start, with "enougth" space, might be safer (faster ?).

再见

弗朗西斯

更多: qsort (在stdlib中)可能会更快地进行本地排序.

More : qsort (in stdlib) may go faster for local sorting.

这篇关于使用MPI进行并行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆