#include #include #include #include "mpi.h" #define YES 1 #define NO 0 #define MAXARRAYSIZE 524288 /* #define MAXARRAYSIZE 262144 #define MAXARRAYSIZE 1048576 #define MAXARRAYSIZE 16777216 */ #define ONEITEM 1 #define MPIHEAD 0 #define MAXLONG 0x7fffffff #define BEOWULFSIZE 32 #define LONG long /*******************************************************************/ /* slight sloppiness */ /* we're going to have comm, commsize, myrank as global variables */ /* also the fplog output file so all our print routines go to a */ /* labelled file instead of to stdout or stderr */ /*******************************************************************/ int commsize,myrank; FILE *fplog; MPI_Comm comm; /*******************************************************************/ /* this is the structure for our local array data */ /*******************************************************************/ typedef struct { LONG *data; LONG numlocal; } parray; int compar_ascending(const void *a, const void *b); int compar_descending(const void *a, const void *b); void pacreate(parray *pa,LONG numlocal,MPI_Comm comm); void paload(parray *pa,MPI_Comm comm); void paprint(char *label,parray *pa,MPI_Comm comm); char *xmalloc(LONG howmany); #include "INC.MPI.tdecl.c" int main(int argc,char *argv[]) { char filename[101]; char procname[MPI_MAX_PROCESSOR_NAME]; int namelen,tag; LONG i,ii,lengtha,lengthb,lengthrecv,localmaxsize,localsamplepointer,maxsize; LONG partitionsize,ptra,ptrrecv,thispartition; LONG *dataa,*datab,*partitionsend,*partitionrecv; LONG globalsamples[(BEOWULFSIZE+1)*(BEOWULFSIZE+1)]; LONG firstsub[BEOWULFSIZE+1]; LONG lastsub[BEOWULFSIZE+1]; LONG localsamplessend[BEOWULFSIZE+1]; LONG localsamplesrecv[BEOWULFSIZE+1]; parray pa; MPI_Status status; /*******************************************************************/ /* do the mpi initialisation */ /*******************************************************************/ MPI_Init(&argc,&argv); comm = MPI_COMM_WORLD; MPI_Comm_size(comm,&commsize); MPI_Comm_rank(comm,&myrank); MPI_Get_processor_name(procname,&namelen); sprintf(filename,"zout.%d",myrank); fplog = fopen(filename,"w"); /*******************************************************************/ /* create and load the data into the local arrays */ /*******************************************************************/ partitionsize = MAXARRAYSIZE/commsize; timecall("beginning",fplog,myrank); pacreate(&pa,partitionsize,comm); #ifdef EBUGA paprint("Aaft create",&pa,comm); #endif paload(&pa,comm); #ifdef EBUGB paprint("Baft load",&pa,comm); #endif /*******************************************************************/ /* sort the local arrays */ /*******************************************************************/ qsort(&(pa.data[0]),pa.numlocal,sizeof(LONG),(*compar_ascending)); #ifdef EBUGC paprint("Cafter sort",&pa,comm); #endif /*******************************************************************/ /* sample locally */ /*******************************************************************/ localsamplepointer = 0; for(i = 1; i <= partitionsize; i += partitionsize/commsize) { localsamplessend[localsamplepointer] = pa.data[i-1]; localsamplepointer++; } #ifdef EBUGD fprintf(fplog,"D%2d:%2d:",myrank,commsize); for(i = 0; i < commsize; i++) { fprintf(fplog," %ld",localsamplessend[i]);; } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* local processors send their samples to the head */ /* the head receives and packs a global sample array */ /* this last step should be done as a gather */ /*******************************************************************/ tag = 0; MPI_Barrier(comm); MPI_Send(&localsamplessend[0],commsize,MPI_LONG,MPIHEAD,tag,comm); MPI_Barrier(comm); if(MPIHEAD == myrank) { for(i = 0; i < commsize; i++) { MPI_Recv(&localsamplesrecv[0],commsize,MPI_LONG,i,tag,comm,&status); for(ii = 0; ii < commsize; ii++) { globalsamples[i*commsize + ii] = localsamplesrecv[ii]; } } #ifdef EBUGE for(i = 0; i < commsize; i++) { fprintf(fplog,"E%2d:%2d:",myrank,commsize); for(ii = 0; ii < commsize; ii++) { fprintf(fplog," %11ld",globalsamples[i*commsize+ii]); } fprintf(fplog,"\n"); fflush(fplog); } #endif /*******************************************************************/ /* sort on the head node the samples collected from local nodes */ /*******************************************************************/ qsort(globalsamples,commsize*commsize,sizeof(LONG),(*compar_ascending)); #ifdef EBUGF fprintf(fplog,"F%2d:%2d:",myrank,commsize); for(ii = 0; ii < commsize*commsize; ii++) { fprintf(fplog," %11ld",globalsamples[ii]); } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* subsample for global samples with which to split the data */ /*******************************************************************/ for(i = 1; i <= commsize-1; i++) { ii = i*commsize + commsize/2 - 1; localsamplessend[i-1] = globalsamples[ii]; } #ifdef EBUGG fprintf(fplog,"G%2d:%2d:",myrank,commsize); for(i = 1; i <= commsize-1; i++) { ii = i*commsize + commsize/2 - 1; fprintf(fplog,"(%ld %ld)",ii,localsamplessend[i-1]);; } fprintf(fplog,"\n"); fflush(fplog); #endif } /* if(MPIHEAD == myrank) */ MPI_Barrier(comm); #ifdef EBUGH fprintf(fplog,"H%2d:%2d:",myrank,commsize); for(i = 1; i <= commsize-1; i++) { fprintf(fplog,"(%ld %ld)",i-1,localsamplessend[i-1]);; } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* broadcast the subsample to all nodes */ /*******************************************************************/ localsamplessend[commsize-1] = MAXLONG; MPI_Bcast(&localsamplessend[0],commsize,MPI_LONG,MPIHEAD,comm); MPI_Barrier(comm); #ifdef EBUGI fprintf(fplog,"I%2d:%2d:",myrank,commsize); for(i = 1; i <= commsize; i++) { fprintf(fplog,"(%ld %ld)",i-1,localsamplessend[i-1]);; } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* now comes the trickier part of sending data from everyone to */ /* everyone (but in fact to the right someone) */ /*******************************************************************/ /*******************************************************************/ /* first we determine the beginning and ending subscripts in each */ /* local array of the data to be sent to the other processors */ /*******************************************************************/ ii = 0; firstsub[ii] = 0; lastsub[ii] = 0; for(i = 0; i < pa.numlocal; i++) { if(pa.data[i] <= localsamplessend[ii]) { lastsub[ii] = i; } else { ii++; if(ii > commsize) break; firstsub[ii] = i; lastsub[ii] = 0; } } /*******************************************************************/ /* in each local processor, find the maximum subpartition size */ /*******************************************************************/ maxsize = 0; for(i = 0; i < commsize; i++) { localmaxsize = lastsub[i] - firstsub[i] + 1; if(localmaxsize > maxsize) maxsize = localmaxsize; } #ifdef EBUGJ for(ii = 0; ii < commsize; ii++) { fprintf(fplog,"JF%2d:%2d:%5ld:",myrank,commsize,maxsize); fprintf(fplog,"(%5ld:%5ld:%5ld) %11ld %11ld %11ld\n",ii,firstsub[ii], lastsub[ii] - firstsub[ii] + 1, pa.data[firstsub[ii]-1], pa.data[firstsub[ii] ], pa.data[firstsub[ii]+1]); fprintf(fplog,"JL%2d:%2d:%5ld:",myrank,commsize,maxsize); fprintf(fplog,"(%5ld:%5ld:%5ld) %11ld %11ld %11ld\n",ii,lastsub[ii], lastsub[ii] - firstsub[ii] + 1, pa.data[lastsub[ii]-1], pa.data[lastsub[ii] ], pa.data[lastsub[ii]+1]); } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* and then allreduce to get the global max size to all nodes */ /* now we know the absolute max size for data sends */ /*******************************************************************/ localmaxsize = maxsize; MPI_Allreduce(&localmaxsize,&maxsize,ONEITEM,MPI_LONG,MPI_MAX,comm); #ifdef EBUGK fprintf(fplog,"K%2d:%2d:%ld\n",myrank,commsize,maxsize); #endif partitionsend = (LONG *) xmalloc((maxsize+1) * sizeof(LONG)); partitionrecv = (LONG *) xmalloc((maxsize+1) * sizeof(LONG)); /*******************************************************************/ /* there is probably a better way to do this, but time is short */ /* we're going to move our own subpartition's data into dataa */ /* we will receive data from the other processors into */ /* partitionrecv */ /* then we will merge dataa and partitionrecv into datab */ /* and finally free the dataa space and move the datab pointer */ /* to the dataa pointer to prepare for the next merge step */ /*******************************************************************/ /*******************************************************************/ /* first copy our subpartition data into dataa as the first */ /* "current" data for the merge step */ /*******************************************************************/ dataa = (LONG *) xmalloc((commsize*MAXARRAYSIZE+1)*sizeof(LONG)); lengtha = lastsub[myrank] - firstsub[myrank] + 1; for(i = 1; i <= lengtha; i++) dataa[i] = pa.data[firstsub[myrank] + i - 1]; dataa[0] = lengtha; /*******************************************************************/ /* we're going to do the data move node number by node number */ /* there well may be a better way to do this, but it would be more */ /* complicated, and time is short */ /*******************************************************************/ for( i = 1; i < commsize; i++) { thispartition = (myrank+i)%commsize; partitionsend[0] = lastsub[thispartition] - firstsub[thispartition] + 1; #ifdef EBUGL fprintf(fplog,"L%2d:%2d:(%ld %ld %ld) (%ld %ld)\n",myrank,commsize, i,thispartition,lengthrecv, firstsub[thispartition],lastsub[thispartition]); fflush(fplog); #endif /*******************************************************************/ /* copy our subpartition data into dataa as the first "current" */ /* data for the merge step */ /*******************************************************************/ for(ii = 0; ii < partitionsend[0]; ii++) { partitionsend[ii+1] = pa.data[firstsub[thispartition] + ii]; } #ifdef EBUGL for(ii = 0; ii < partitionsend[0]; ii++) { fprintf(fplog,"LL%2d:%2d (%5ld %8ld %8ld %8ld) %11ld\n",myrank,commsize,i,ii+1, maxsize,partitionsend[0],partitionsend[ii+1]); } fflush(fplog); #endif /*******************************************************************/ /* send my data for to proc i ahead of me, and receive from proc */ /* i behind me the data that actually should belong to me */ /*******************************************************************/ MPI_Send(&partitionsend[0],maxsize+1,MPI_LONG, (myrank+i)%commsize,tag,comm); MPI_Recv(&partitionrecv[0],maxsize+1,MPI_LONG, (commsize+myrank-i)%commsize,tag,comm,&status); lengthrecv = partitionrecv[0]; #ifdef EBUGM for(ii = 0; ii < lengthrecv; ii++) { fprintf(fplog,"M%2d:%2d:%ld:%ld:%ld:%ld", myrank,commsize,i,maxsize,lengthrecv,ii+1); fprintf(fplog," %ld\n",partitionrecv[ii+1]); } fflush(fplog); #endif /*******************************************************************/ /* malloc the new array with enough space for the merged list */ /*******************************************************************/ lengtha = dataa[0]; datab = (LONG *) malloc((lengtha+lengthrecv+1) * sizeof(LONG)); #ifdef EBUGN fprintf(fplog,"N%2d:%2d:%ld:%ld:%ld",myrank,commsize,i,maxsize,lengtha); for(ii = 0; ii < lengtha; ii++) { fprintf(fplog," %ld",dataa[ii+1]); } fprintf(fplog,"\n"); fflush(fplog); #endif #ifdef EBUGO fprintf(fplog,"O%2d:%2d:%ld:%ld:%ld",myrank,commsize,i,maxsize,lengthrecv); for(ii = 0; ii < lengthrecv; ii++) { fprintf(fplog," %ld",partitionrecv[ii+1]); } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* now do the actual merge from dataa and partitionrecv into datab */ /*******************************************************************/ ptra = 1; ptrrecv = 1; lengthb = 1; while((ptra <= lengtha) | (ptrrecv <= lengthrecv)) { #ifdef EBUGP fprintf(fplog,"P%2d:%2d:%ld:%ld:%ld %ld %ld\n",myrank,commsize, ptra,ptrrecv,lengthb,dataa[ptra],partitionrecv[ptrrecv]); fflush(fplog); #endif if(ptra > lengtha) { datab[lengthb] = partitionrecv[ptrrecv]; ptrrecv++; } else if(ptrrecv > lengthrecv) { datab[lengthb] = dataa[ptra]; ptra++; } else if(dataa[ptra] < partitionrecv[ptrrecv]) { datab[lengthb] = dataa[ptra]; ptra++; } else { datab[lengthb] = partitionrecv[ptrrecv]; ptrrecv++; } lengthb++; } lengthb--; datab[0] = lengthb; #ifdef EBUGQ fprintf(fplog,"Q%2d:%2d:%ld:%ld:%ld",myrank,commsize,i,maxsize,lengthb); for(ii = 0; ii < lengthb; ii++) { fprintf(fplog," %ld",datab[ii+1]); } fprintf(fplog,"\n"); fflush(fplog); #endif /*******************************************************************/ /* trash the old list, rename the newer larger list, and repeat */ /*******************************************************************/ free(dataa); dataa = datab; } /* for( i = 1; i < commsize; i++) */ #ifdef EBUGR for( i = 0; i < commsize; i++) { MPI_Barrier(comm); if(myrank == i) { lengtha = dataa[0]; fprintf(fplog,"R%2d:%2d:%ld:%ld:%ld\n", myrank,commsize,i,maxsize,lengtha); fflush(fplog); } } fflush(fplog); #endif #ifdef EBUGX for( i = 0; i < commsize; i++) { MPI_Barrier(comm); if(myrank == i) { lengtha = dataa[0]; for(ii = 0; ii < lengtha; ii++) { fprintf(fplog,"%2d %11ld\n",myrank,dataa[ii+1]); } fflush(fplog); } } fflush(fplog); #endif MPI_Barrier(comm); timecall("ending",fplog,myrank); MPI_Finalize(); return 0; } /*******************************************************************/ /* NOTE: The use of the qsort utility requires care in defining */ /* the functions to make sure that they work correctly */ /*******************************************************************/ /*******************************************************************/ /* compar_ascending to sort into ascending order for qsort */ /*******************************************************************/ int compar_ascending(const void *a, const void *b) { LONG xa,xb; xa = *(LONG *)a; xb = *(LONG *)b; if(xa > xb) return(1); else if(xa < xb) return(-1); else return(0); } /*******************************************************************/ /* compar_ascending to sort into ascending order for qsort */ /*******************************************************************/ int compar_descending(const void *a, const void *b) { LONG xa,xb; xa = *(LONG *)a; xb = *(LONG *)b; if(xa > xb) return(-1); else if(xa < xb) return(1); else return(0); } /*******************************************************************/ /* pacreate to create the global parallel array structure */ /*******************************************************************/ void pacreate(parray *pa,LONG numlocal,MPI_Comm comm) { LONG i; pa->data = (LONG *) malloc((numlocal+1)*sizeof(LONG)); for(i = 0; i < numlocal; i++) pa->data[i] = -(myrank*10000 + i); pa->numlocal = numlocal; } /*******************************************************************/ /* paload to load the global parallel array structure */ /*******************************************************************/ void paload(parray *pa,MPI_Comm comm) { LONG i; #ifdef CREATEFILE char filename[101]; FILE *fopen(),*fp; sprintf(filename,"randomdata.%d",myrank); fp = fopen(filename,"w"); #endif srand((unsigned int) myrank+12345); for(i = 0; i < pa->numlocal; i++) { pa->data[i] = rand(); #ifdef CREATEFILE fprintf(fp,"%11ld\n",pa->data[i]); #endif } #ifdef CREATEFILE fclose(fp); #endif } /*******************************************************************/ /* paprint to print locally the parallel array structure */ /*******************************************************************/ void paprint(char *label,parray *pa,MPI_Comm comm) { LONG i,ii,iii; for(i = 0; i < commsize; i++) { MPI_Barrier(comm); if(myrank == i) { for(ii = 0; ii < pa->numlocal; ii++) { iii = myrank*MAXARRAYSIZE/commsize + ii; fprintf(fplog,"P%2d:%2d %s:%5ld %5ld %5ld %11ld\n", myrank,commsize,label,i,ii,iii,pa->data[ii]); fflush(fplog); } } } } char *xmalloc(LONG howmany) { char *thechar; thechar = (char *) malloc(howmany); if(thechar == (char *) 0) { fprintf(stderr,"ERROR malloc fails of %ld\n",howmany); exit(1); } fprintf(fplog,"%2d MALLOC succeeds of %ld\n",myrank,howmany); return(thechar); }