Variable Matrix Scatter Matrices Using MPI

(Assume that all matrices are stored in lower order.) An example illustrating the problem is to distribute a 10x10 matrix over a 3x3 grid, so the size of the submatrices in each node looks like this:

|-----+-----+-----| | 3x3 | 3x3 | 3x4 | |-----+-----+-----| | 3x3 | 3x3 | 3x4 | |-----+-----+-----| | 4x3 | 4x3 | 4x4 | |-----+-----+-----| 

I saw a lot of posts in Stackoverflow (for example, sending blocks of a 2D array in C using MPI and the MPI matrix section in blocks ). But they only deal with blocks of the same size (in this case, we can just use MPI_Type_vector or MPI_Type_create_subarray and only one call to MPI_Scatterv ).

So, I am wondering what is the most efficient way in MPI to spread a matrix into a processor grid, where each processor has a block with a specified size.

PS I also looked at MPI_Type_create_darray , but it does not seem to MPI_Type_create_darray to specify the block size for each processor.

+11
c parallel-processing mpi distributed-computing message-passing
Mar 29 '15 at 4:07
source share
2 answers

To do this, you need to go through at least one additional step in MPI.

The problem is that the most common of the collection / scatter procedures, MPI_Scatterv and MPI_Gatherv , you can pass the "vector" (v) of counters / offsets, not just one counter for Scatter and Gather, but all types are considered the same. There is no way around this; the memory layouts of each block are different and therefore must be processed by a different type. If there was only one difference between the blocks - some had a different number of columns, or some had a different number of rows - ndash; it’s enough to use different calculations. But with different columns and rows, the counter will not do this; you really need to be able to specify different types.

So what you really want is the often discussed but never implemented MPI_Scatterw (where w means vv, for example, both counters and types are vectors). But this does not exist. The closest you can get is a much more general call to MPI_Alltoallw , which allows you to completely exchange all-for-all data transmission and reception; as the spec specifies, "The MPI_ALLTOALLW function generalizes several MPI functions by carefully choosing the input arguments. For example, by doing all but one of the processes, sendcounts (i) = 0, this reaches MPI_SCATTERW." .

So, you can do this using MPI_Alltoallw, having all the processes different from those that originally had all the data (we assume that rank 0 is here) sent all their send counts to zero. All tasks will also have all their receive counts up to zero, except for the first - the amount of data that they will receive from rank zero.

When sending process samples 0, you first need to determine four different types of types (4 different sizes of subarrays), and then the send counters will be 1, and the only remaining part is to find out the offsets (which, unlike scatterv, are here in units of bytes, because one type cannot be used as a unit):

  /* 4 types of blocks - * blocksize*blocksize, blocksize+1*blocksize, blocksize*blocksize+1, blocksize+1*blocksize+1 */ MPI_Datatype blocktypes[4]; int subsizes[2]; int starts[2] = {0,0}; for (int i=0; i<2; i++) { subsizes[0] = blocksize+i; for (int j=0; j<2; j++) { subsizes[1] = blocksize+j; MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]); MPI_Type_commit(&blocktypes[2*i+j]); } } /* now figure out the displacement and type of each processor data */ for (int proc=0; proc<size; proc++) { int row, col; rowcol(proc, blocks, &row, &col); sendcounts[proc] = 1; senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char); int idx = typeIdx(row, col, blocks); sendtypes[proc] = blocktypes[idx]; } } MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes, &(localdata[0][0]), recvcounts, recvdispls, recvtypes, MPI_COMM_WORLD); 

And it will work.

But the problem is that the Alltoallw function is so completely general that it’s hard for implementations to do much in the optimization line; therefore, I would be surprised if this were done, as well as the scatter of blocks of the same size.

So another approach is to do something like two stages of communication.

The simplest such approach follows after you notice that you can almost get all the data where he needs to go with a single call to MPI_Scatterv() : in your example, if we work in units of one column vector with column = 1 and rows = 3 (the number of rows in most blocks of a domain), you can scatter almost all global data on other processors. Each of the processors receives 3 or 4 of these vectors, which distributes all the data except the very last row of the global array, which can be processed by a simple second spread. It looks like this:

 /* We're going to be operating mostly in units of a single column of a "normal" sized block. * There will need to be two vectors describing these columns; one in the context of the * global array, and one in the local results. */ MPI_Datatype vec, localvec; MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); /* The originating process needs to allocate and fill the source array, * and then define types defining the array chunks to send, and * fill out senddispls, sendcounts (1) and sendtypes. */ if (rank == 0) { /* create the vector type which will send one column of a "normal" sized-block */ /* then all processors except those in the last row need to get blocksize*vec or (blocksize+1)*vec */ /* will still have to do something to tidy up the last row of values */ /* we need to make the type have extent of 1 char for scattering */ for (int proc=0; proc<size; proc++) { int row, col; rowcol(proc, blocks, &row, &col); sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize; senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize); } } recvcounts = localsizes[1]; MPI_Scatterv(globalptr, sendcounts, senddispls, vec, &(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD); MPI_Type_free(&localvec); if (rank == 0) MPI_Type_free(&vec); /* now we need to do one more scatter, scattering just the last row of data * just to the processors on the last row. * Here we recompute the send counts */ if (rank == 0) { for (int proc=0; proc<size; proc++) { int row, col; rowcol(proc, blocks, &row, &col); sendcounts[proc] = 0; senddispls[proc] = 0; if ( isLastRow(row,blocks) ) { sendcounts[proc] = blocksize; senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize; if ( isLastCol(col,blocks) ) sendcounts[proc] += 1; } } } recvcounts = 0; if ( isLastRow(myrow, blocks) ) { recvcounts = blocksize; if ( isLastCol(mycol, blocks) ) recvcounts++; } MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR, &(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD); 

So far so good. But it’s a shame that most processors are sitting doing nothing during this final “scattering” scrub.

Thus, the best approach is to scatter all the rows in the first phase and scatter this data among the columns of the second phase. Here we create new communicators, and each processor belongs to two new communicators - one of which represents the other processors in the same row of the block, and the other in the same column of the block. At the first stage, the origin processor distributes all the rows of the global array to other processors in the same column communicator, which can be done in one spread. Then these processors, using one scatter and the same column data type as in the previous example, scatter columns on each processor in the same row of the block as it is. The result is two fairly simple scatterings spreading all the data:

 /* create communicators which have processors with the same row or column in them*/ MPI_Comm colComm, rowComm; MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm); MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm); /* first, scatter the array by rows, with the processor in column 0 corresponding to each row * receiving the data */ if (mycol == 0) { int sendcounts[ blocks[0] ]; int senddispls[ blocks[0] ]; senddispls[0] = 0; for (int row=0; row<blocks[0]; row++) { /* each processor gets blocksize rows, each of size globalsizes[1]... */ sendcounts[row] = blocksize*globalsizes[1]; if (row > 0) senddispls[row] = senddispls[row-1] + sendcounts[row-1]; } /* the last processor gets one more */ sendcounts[blocks[0]-1] += globalsizes[1]; /* allocate my rowdata */ rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] ); /* perform the scatter of rows */ MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR, &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm); } /* Now, within each row of processors, we can scatter the columns. * We can do this as we did in the previous example; create a vector * (and localvector) type and scatter accordingly */ int locnrows = blocksize; if ( isLastRow(myrow, blocks) ) locnrows++; MPI_Datatype vec, localvec; MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); int sendcounts[ blocks[1] ]; int senddispls[ blocks[1] ]; if (mycol == 0) { for (int col=0; col<blocks[1]; col++) { sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize; senddispls[col] = col*blocksize; } } char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL; MPI_Scatterv(rowptr, sendcounts, senddispls, vec, &(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm); 

which is simpler and should have a relatively good balance between performance and reliability.

All three of these methods work:

 bash-3.2$ mpirun -np 6 ./allmethods alltoall Global array: abcdefg hijklmn opqrstu vwxyzab cdefghi jklmnop qrstuvw xyzabcd efghijk lmnopqr Method - alltoall Rank 0: abc hij opq Rank 1: defg klmn rstu Rank 2: vwx cde jkl Rank 3: yzab fghi mnop Rank 4: qrs xyz efg lmn Rank 5: tuvw abcd hijk opqr bash-3.2$ mpirun -np 6 ./allmethods twophasevecs Global array: abcdefg hijklmn opqrstu vwxyzab cdefghi jklmnop qrstuvw xyzabcd efghijk lmnopqr Method - two phase, vectors, then cleanup Rank 0: abc hij opq Rank 1: defg klmn rstu Rank 2: vwx cde jkl Rank 3: yzab fghi mnop Rank 4: qrs xyz efg lmn Rank 5: tuvw abcd hijk opqr bash-3.2$ mpirun -np 6 ./allmethods twophaserowcol Global array: abcdefg hijklmn opqrstu vwxyzab cdefghi jklmnop qrstuvw xyzabcd efghijk lmnopqr Method - two phase - row, cols Rank 0: abc hij opq Rank 1: defg klmn rstu Rank 2: vwx cde jkl Rank 3: yzab fghi mnop Rank 4: qrs xyz efg lmn Rank 5: tuvw abcd hijk opqr 

The following is code that implements these methods; you can set the block sizes for more typical sizes for your problem and run on a realistic number of processors to get an idea of ​​what will be better for your application.

 #include <stdio.h> #include <stdlib.h> #include <string.h> #include "mpi.h" /* auxiliary routines, found at end of program */ char **allocchar2darray(int n, int m); void freechar2darray(char **a); void printarray(char **data, int n, int m); void rowcol(int rank, const int blocks[2], int *row, int *col); int isLastRow(int row, const int blocks[2]); int isLastCol(int col, const int blocks[2]); int typeIdx(int row, int col, const int blocks[2]); /* first method - alltoallw */ void alltoall(const int myrow, const int mycol, const int rank, const int size, const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2], const char *const globalptr, char **localdata) { /* * get send and recieve counts ready for alltoallw call. * everyone will be recieving just one block from proc 0; * most procs will be sending nothing to anyone. */ int sendcounts[ size ]; int senddispls[ size ]; MPI_Datatype sendtypes[size]; int recvcounts[ size ]; int recvdispls[ size ]; MPI_Datatype recvtypes[size]; for (int proc=0; proc<size; proc++) { recvcounts[proc] = 0; recvdispls[proc] = 0; recvtypes[proc] = MPI_CHAR; sendcounts[proc] = 0; senddispls[proc] = 0; sendtypes[proc] = MPI_CHAR; } recvcounts[0] = localsizes[0]*localsizes[1]; recvdispls[0] = 0; /* The originating process needs to allocate and fill the source array, * and then define types defining the array chunks to send, and * fill out senddispls, sendcounts (1) and sendtypes. */ if (rank == 0) { /* 4 types of blocks - * blocksize*blocksize, blocksize+1*blocksize, blocksize*blocksize+1, blocksize+1*blocksize+1 */ MPI_Datatype blocktypes[4]; int subsizes[2]; int starts[2] = {0,0}; for (int i=0; i<2; i++) { subsizes[0] = blocksize+i; for (int j=0; j<2; j++) { subsizes[1] = blocksize+j; MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]); MPI_Type_commit(&blocktypes[2*i+j]); } } /* now figure out the displacement and type of each processor data */ for (int proc=0; proc<size; proc++) { int row, col; rowcol(proc, blocks, &row, &col); sendcounts[proc] = 1; senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char); int idx = typeIdx(row, col, blocks); sendtypes[proc] = blocktypes[idx]; } } MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes, &(localdata[0][0]), recvcounts, recvdispls, recvtypes, MPI_COMM_WORLD); } /* second method: distribute almost all data using colums of size blocksize, * then clean up the last row with another scatterv */ void twophasevecs(const int myrow, const int mycol, const int rank, const int size, const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2], const char *const globalptr, char **localdata) { int sendcounts[ size ]; int senddispls[ size ]; int recvcounts; for (int proc=0; proc<size; proc++) { sendcounts[proc] = 0; senddispls[proc] = 0; } /* We're going to be operating mostly in units of a single column of a "normal" sized block. * There will need to be two vectors describing these columns; one in the context of the * global array, and one in the local results. */ MPI_Datatype vec, localvec; MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); /* The originating process needs to allocate and fill the source array, * and then define types defining the array chunks to send, and * fill out senddispls, sendcounts (1) and sendtypes. */ if (rank == 0) { /* create the vector type which will send one column of a "normal" sized-block */ /* then all processors except those in the last row need to get blocksize*vec or (blocksize+1)*vec */ /* will still have to do something to tidy up the last row of values */ /* we need to make the type have extent of 1 char for scattering */ for (int proc=0; proc<size; proc++) { int row, col; rowcol(proc, blocks, &row, &col); sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize; senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize); } } recvcounts = localsizes[1]; MPI_Scatterv(globalptr, sendcounts, senddispls, vec, &(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD); MPI_Type_free(&localvec); if (rank == 0) MPI_Type_free(&vec); /* now we need to do one more scatter, scattering just the last row of data * just to the processors on the last row. * Here we recompute the sendcounts */ if (rank == 0) { for (int proc=0; proc<size; proc++) { int row, col; rowcol(proc, blocks, &row, &col); sendcounts[proc] = 0; senddispls[proc] = 0; if ( isLastRow(row,blocks) ) { sendcounts[proc] = blocksize; senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize; if ( isLastCol(col,blocks) ) sendcounts[proc] += 1; } } } recvcounts = 0; if ( isLastRow(myrow, blocks) ) { recvcounts = blocksize; if ( isLastCol(mycol, blocks) ) recvcounts++; } MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR, &(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD); } /* third method: first distribute rows, then columns, each with a single scatterv */ void twophaseRowCol(const int myrow, const int mycol, const int rank, const int size, const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2], const char *const globalptr, char **localdata) { char **rowdata ; /* create communicators which have processors with the same row or column in them*/ MPI_Comm colComm, rowComm; MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm); MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm); /* first, scatter the array by rows, with the processor in column 0 corresponding to each row * receiving the data */ if (mycol == 0) { int sendcounts[ blocks[0] ]; int senddispls[ blocks[0] ]; senddispls[0] = 0; for (int row=0; row<blocks[0]; row++) { /* each processor gets blocksize rows, each of size globalsizes[1]... */ sendcounts[row] = blocksize*globalsizes[1]; if (row > 0) senddispls[row] = senddispls[row-1] + sendcounts[row-1]; } /* the last processor gets one more */ sendcounts[blocks[0]-1] += globalsizes[1]; /* allocate my rowdata */ rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] ); /* perform the scatter of rows */ MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR, &(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm); } /* Now, within each row of processors, we can scatter the columns. * We can do this as we did in the previous example; create a vector * (and localvector) type and scatter accordingly */ int locnrows = blocksize; if ( isLastRow(myrow, blocks) ) locnrows++; MPI_Datatype vec, localvec; MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec); MPI_Type_create_resized(vec, 0, sizeof(char), &vec); MPI_Type_commit(&vec); MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec); MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec); MPI_Type_commit(&localvec); int sendcounts[ blocks[1] ]; int senddispls[ blocks[1] ]; if (mycol == 0) { for (int col=0; col<blocks[1]; col++) { sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize; senddispls[col] = col*blocksize; } } char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL; MPI_Scatterv(rowptr, sendcounts, senddispls, vec, &(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm); MPI_Type_free(&localvec); MPI_Type_free(&vec); if (mycol == 0) freechar2darray(rowdata); MPI_Comm_free(&rowComm); MPI_Comm_free(&colComm); } int main(int argc, char **argv) { int rank, size; int blocks[2] = {0,0}; const int blocksize=3; int globalsizes[2], localsizes[2]; char **globaldata; char *globalptr = NULL; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); if (rank == 0 && argc < 2) { fprintf(stderr,"Usage: %s method\n Where method is one of: alltoall, twophasevecs, twophaserowcol\n", argv[0]); MPI_Abort(MPI_COMM_WORLD,1); } /* calculate sizes for a 2d grid of processors */ MPI_Dims_create(size, 2, blocks); int myrow, mycol; rowcol(rank, blocks, &myrow, &mycol); /* create array sizes so that last block has 1 too many rows/cols */ globalsizes[0] = blocks[0]*blocksize+1; globalsizes[1] = blocks[1]*blocksize+1; if (rank == 0) { globaldata = allocchar2darray(globalsizes[0], globalsizes[1]); globalptr = &(globaldata[0][0]); for (int i=0; i<globalsizes[0]; i++) for (int j=0; j<globalsizes[1]; j++) globaldata[i][j] = 'a'+(i*globalsizes[1] + j)%26; printf("Global array: \n"); printarray(globaldata, globalsizes[0], globalsizes[1]); } /* the local chunk we'll be receiving */ localsizes[0] = blocksize; localsizes[1] = blocksize; if ( isLastRow(myrow,blocks)) localsizes[0]++; if ( isLastCol(mycol,blocks)) localsizes[1]++; char **localdata = allocchar2darray(localsizes[0],localsizes[1]); if (!strcasecmp(argv[1], "alltoall")) { if (rank == 0) printf("Method - alltoall\n"); alltoall(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata); } else if (!strcasecmp(argv[1],"twophasevecs")) { if (rank == 0) printf("Method - two phase, vectors, then cleanup\n"); twophasevecs(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata); } else { if (rank == 0) printf("Method - two phase - row, cols\n"); twophaseRowCol(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata); } for (int proc=0; proc<size; proc++) { if (proc == rank) { printf("\nRank %d:\n", proc); printarray(localdata, localsizes[0], localsizes[1]); } MPI_Barrier(MPI_COMM_WORLD); } freechar2darray(localdata); if (rank == 0) freechar2darray(globaldata); MPI_Finalize(); return 0; } char **allocchar2darray(int n, int m) { char **ptrs = malloc(n*sizeof(char *)); ptrs[0] = malloc(n*m*sizeof(char)); for (int i=0; i<n*m; i++) ptrs[0][i]='.'; for (int i=1; i<n; i++) ptrs[i] = ptrs[i-1] + m; return ptrs; } void freechar2darray(char **a) { free(a[0]); free(a); } void printarray(char **data, int n, int m) { for (int i=0; i<n; i++) { for (int j=0; j<m; j++) putchar(data[i][j]); putchar('\n'); } } void rowcol(int rank, const int blocks[2], int *row, int *col) { *row = rank/blocks[1]; *col = rank % blocks[1]; } int isLastRow(int row, const int blocks[2]) { return (row == blocks[0]-1); } int isLastCol(int col, const int blocks[2]) { return (col == blocks[1]-1); } int typeIdx(int row, int col, const int blocks[2]) { int lastrow = (row == blocks[0]-1); int lastcol = (col == blocks[1]-1); return lastrow*2 + lastcol; } 
+13
Apr 6 '15 at 18:10
source share

Not sure if this applies to you, but it has helped me in the past so that it can be useful to others.

My answer applies in the context of parallel I / O. The fact is that if you know that your access does not overlap, you can successfully write / read even with variable sizes using MPI_COMM_SELF

The part of the code that I use every day contains:

 MPI_File fh; MPI_File_open(MPI_COMM_SELF, path.c_str(), MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh); // Lot of computation to get the size right MPI_Datatype filetype; MPI_Type_create_subarray(gsizes.size(), &gsizes[0], &lsizes[0], &offset[0], MPI_ORDER_C, MPI_FLOAT, &filetype); MPI_Type_commit(&filetype); MPI_File_set_view(fh, 0, MPI_FLOAT, filetype, "native", MPI_INFO_NULL); MPI_File_write(fh, &block->field[0], block->field.size(), MPI_FLOAT, MPI_STATUS_IGNORE); MPI_File_close(&fh); 
0
Mar 29 '15 at 4:24
source share



All Articles