src/Message/CommSHMEMPI.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 // include files
00027 #include "Message/CommSHMEMPI.h"
00028 #include "Message/Message.h"
00029 #include "Utility/IpplInfo.h"
00030 #include "Profile/Profiler.h"
00031 
00032 // include mpi header file.
00033 #include <mpi.h>
00034 
00035 #include <string.h>
00036 #include <stdlib.h>
00037 #include <unistd.h>
00038 
00039 
00040 // if an error occurs during myreceive more times than this, CommSHMEMPI
00041 // will just exit.  Make it negative to totally disable checking for a
00042 // maximum number of errors
00043 #define MAX_SHMEMPI_ERRS        500
00044 
00045 
00046 // static data to keep track of errors
00047 static int numErrors = 0;
00048 static int size_of_SHMEMPI_INT; /* needed for tracing */
00049 
00050 // temporary buffer used for speed
00051 #define PSIZE 1024*16
00052 #define PACKSIZE ((PSIZE)*sizeof(long))
00053 static long shmempipackbuf[PSIZE];
00054 
00055 
00056 
00058 // constructor.   arguments: command-line args, and number of processes
00059 // to start (if < 0, start the 'default' number ... for MPI, this value
00060 // will be ignored, since the number of nodes is determined by the args
00061 // to mpirun.
00062 CommSHMEMPI::CommSHMEMPI(int& argc , char**& argv, int procs)
00063   : Communicate(argc, argv, procs) {
00064   TAU_PROFILE("CommSHMEMPI::CommSHMEMPI()", "void (int, char **, int)",
00065               TAU_MESSAGE);
00066 
00067   int i, reported, rep_host, ierror, result_len;
00068   MPI_Status stat;
00069   char *currtok, *nexttok, *execname;
00070 
00071   // a little "string magic" to strip the absolute pathname off the executable
00072   currtok = strstr(argv[0],"/");
00073   if (!currtok) {
00074     execname = strdup(argv[0]);
00075   } else {
00076     currtok++;
00077     nexttok = strstr(currtok,"/");
00078     while (nexttok) {
00079       currtok = nexttok+1;
00080       nexttok = strstr(currtok,"/");
00081     }
00082     execname = strdup(currtok);
00083   }
00084 
00085   // initialize mpi
00086   MPI_Init(&argc, &argv);
00087 
00088   // restore original executable name without absolute path
00089   strcpy(argv[0],execname);
00090 
00091   // determine the number of nodes running and my node number
00092   MPI_Comm_size(MPI_COMM_WORLD,&TotalNodes);
00093   MPI_Comm_rank(MPI_COMM_WORLD,&myHost);
00094 
00095   // make sure we do not have too many processes running
00096   if (procs > 0 && procs < TotalNodes) {
00097     // if this is a process that is beyond what we had requested, just exit
00098     if (myHost >= procs)
00099       Ippl::abort();
00100     TotalNodes = procs;
00101   }
00102 
00103   MPI_Type_size ( MPI_INT, &size_of_SHMEMPI_INT );
00104   if (myHost == 0) {    // this code is run by the master process
00105     // send a messages to each child node
00106     for (i = 1; i < TotalNodes; i++)
00107     {
00108       MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00109       TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, i, 1*size_of_SHMEMPI_INT);
00110     }
00111 
00112     // wait for the spawned processes to report back that they're ready
00113     int *child_ready = new int[TotalNodes];
00114     for (i = 0; i < TotalNodes; child_ready[i++] = 0);
00115     INFOMSG("CommSHMEMPI: Parent process waiting for children ..." << endl);
00116     reported = 1;               // since the parent is already ready
00117     while (reported < TotalNodes) {
00118       ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
00119           COMM_HOSTS_TAG, MPI_COMM_WORLD, &stat);
00120       TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, stat.MPI_SOURCE,
00121                         1*size_of_SHMEMPI_INT);
00122       if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host])) {
00123          child_ready[rep_host] = 1;
00124          reported++;
00125          INFOMSG("CommSHMEMPI: Child " << rep_host << " ready." << endl);
00126        }
00127        else {
00128          ERRORMSG("CommSHMEMPI: Error with child reporting to parent.  ");
00129          ERRORMSG("rep_host = " << rep_host);
00130          ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00131        }
00132     }
00133 
00134     delete [] child_ready;
00135     INFOMSG("CommSHMEMPI: Initialization complete." << endl);
00136 
00137   }
00138   else {                        // this is a child process; get data from pops
00139     char host_name[MPI_MAX_PROCESSOR_NAME];
00140     ierror = MPI_Get_processor_name(host_name, &result_len);
00141     if (ierror >= 0) {
00142       INFOMSG("CommSHMEMPI: Started job " << myHost << " on host `");
00143       INFOMSG(host_name <<  "'." << endl);
00144     }
00145     else {
00146       ERRORMSG("CommSHMEMPI: failed" << endl);
00147     }
00148 
00149     // receive message from the master node
00150     int checknode;
00151     MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD,
00152              &stat);
00153     TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, 0, 1*size_of_SHMEMPI_INT);
00154     if (checknode != 0)
00155       WARNMSG("CommSHMEMPI: Child received bad message during startup." << endl);
00156 
00157     // send back an acknowledgement
00158     MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00159     TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, 0, 1*size_of_SHMEMPI_INT);
00160   }
00161 
00162   // set up the contexts and processes arrays properly
00163   if (TotalNodes > 1) {
00164     vector<int> proccount;
00165     proccount.push_back(1);
00166     for (i = 1; i < TotalNodes; i++) {
00167       Contexts.push_back(1);
00168       Processes.push_back(proccount);
00169     }
00170   }
00171 }
00172 
00173 
00175 // class destructor
00176 CommSHMEMPI::~CommSHMEMPI(void) {
00177   TAU_PROFILE("CommSHMEMPI::~CommSHMEMPI()", "void()", TAU_MESSAGE);
00178   int i, dieCode = 0;
00179   MPI_Status stat;
00180 
00181   // on all nodes, when running in parallel, get any extra messages not
00182   // yet received
00183   if (TotalNodes > 1) {
00184     int trial, node, tag;
00185     Message *msg;
00186     for (trial = 0; trial < 50000; ++trial) {
00187       do {
00188         node = COMM_ANY_NODE;
00189         tag = COMM_ANY_TAG;
00190         msg = myreceive(node, tag, COMM_SEND_TAG);
00191         if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG) {
00192           WARNMSG("CommSHMEMPI: Found extra message from node " << node);
00193           WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00194         }
00195       } while (msg != 0);
00196     }
00197   }
00198 
00199   // broadcast a message to all other nodes to tell them to quit
00200   if (myNode() == 0) {
00201     // on master node, send out messages
00202     for (i = 1; i < TotalNodes; i++)
00203     {
00204       MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, MPI_COMM_WORLD);
00205       TAU_TRACE_SENDMSG(COMM_DIE_TAG, i, 1*size_of_SHMEMPI_INT);
00206     }
00207   } else {
00208     // on client nodes, receive message
00209     MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
00210     TAU_TRACE_RECVMSG(COMM_DIE_TAG, 0, 1*size_of_SHMEMPI_INT);
00211   }
00212 
00213   MPI_Finalize();
00214 }
00215 
00216 
00218 // take the data from a Message object and pack it into the current send buf.
00219 // each message is packed in this order:
00220 //      tag, sending node, number of items             (3-int array)
00221 //              type of item 1  (short)
00222 //              size of item 1, in number of elements   (int)
00223 //              item 1 data     (various)
00224 //              ...
00225 //              type of item N  (short)
00226 //              size of item N, in number of elements   (int)
00227 //              item N data     (various)
00228 void *CommSHMEMPI::pack_message(Message *msg, int tag, int &buffsize) {
00229   TAU_PROFILE("CommSHMEMPI::pack_message()", "(Message *, int, int)",
00230               TAU_MESSAGE);
00231   // calculate size of buffer
00232   buffsize = find_msg_length(*msg);
00233 
00234   // allocate storage for buffer
00235   void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : shmempipackbuf;
00236 
00237   // pack message data and return the necessary pointer
00238   fill_msg_buffer(pos, *msg, tag, buffsize);
00239   return pos;
00240 }
00241 
00242 
00244 // send a message ... arguments are the Message itself, the
00245 // destination node, the 'user' tag, and the 'encoding' tag.
00246 // Messages should be sent via the underlying mechanism by using the
00247 // encoding tag (one of the COMM_ tags),
00248 // and should embed the information about what the user
00249 // tag is in the data sent between nodes.  Return success.
00250 bool CommSHMEMPI::mysend(Message *msg, int node, int tag, int etag) {
00251   TAU_PROFILE("CommSHMEMPI::mysend()", "bool (Message *, int, int, int)",
00252               TAU_MESSAGE);
00253 
00254   int nitems = msg->size(); 
00255   int errstat = (-1);
00256   int flag = false;
00257   MPI_Request request;
00258   MPI_Status status;
00259 
00260   MPI_Status rec_status;
00261   int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
00262   Message* newmsg = NULL;
00263 
00264   // pack the message data into the buffer
00265   int size;
00266   void *outbuffer = pack_message(msg, tag, size);
00267 
00268   // send the message (non-blocking)
00269   errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
00270                       MPI_COMM_WORLD, &request);
00271   TAU_TRACE_SENDMSG(etag, node, size);
00272 
00273   while (!flag) {
00274     // get info about messages to be received
00275     bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
00276                        &rec_flag, &rec_status);
00277     if ( (bufid >= 0) && (rec_flag != 0) ) {
00278       // a message is available to be received
00279       src_node = rec_status.MPI_SOURCE;
00280       rec_tag = rec_status.MPI_TAG;
00281       MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
00282       if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) ) {
00283         // message is a valid one, so malloc the output buffer
00284         void *rec_buff = makebuffer(rec_size);
00285 
00286         // blocking receive, unpack message
00287         MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
00288                  MPI_COMM_WORLD, &rec_status);
00289         TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00290         newmsg = unpack_message(rec_node, rec_utag, rec_buff);
00291 
00292         // tell this new Message that we were the one that created its
00293         // storage buffer, so that when the Messageis deleted, we can
00294         // be told about it in order to free the storage.
00295         newmsg->useCommunicate(this, rec_buff);
00296 
00297         // put message in my message queue
00298         if (add_msg(newmsg,rec_node,rec_utag)) {
00299           newmsg = NULL; // reset message pointer
00300           rec_flag = 0; // reset receive flag
00301         }
00302       }
00303     }
00304 
00305     // check for completion of send
00306     MPI_Test(&request, &flag, &status);
00307   }
00308 
00309   //  free up the send buffer
00310   if (size > PACKSIZE)
00311     freebuffer(outbuffer);
00312 
00313   // return the success of the operation
00314   return (errstat == 0);
00315 }
00316 
00317 
00319 // receive a message from the given node and user tag.  Return a NEW
00320 // Message object if a message arrives, or NULL if no message available.
00321 // node will be set to the node from which the message was sent.
00322 // tag will be set to the 'user tag' for that message.
00323 // etag is the 'encoding' tag, and must be one of the COMM_ tags. 
00324 // Only message sent via the underlying mechanism with the
00325 // given etag are checked.  When one is found, the user tag and sending
00326 // node are extracted from the sent data.
00327 // If node = COMM_ANY_NODE, checks for messages from any node.
00328 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
00329 Message *CommSHMEMPI::myreceive(int& node, int& tag, int etag) {
00330   TAU_PROFILE("CommSHMEMPI::myreceive()", "Message *(int, int, int)",
00331               TAU_MESSAGE);
00332 
00333   int bufid, size, checknode, checktag, flag = false;
00334   Message *newmsg = 0;
00335   MPI_Status stat;
00336  
00337   checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
00338   checktag = etag;
00339 
00340   // get info about message
00341   bufid = MPI_Iprobe(checknode, checktag, MPI_COMM_WORLD, &flag, &stat);
00342   if (bufid < 0) {
00343     // an error has occurred
00344     ERRORMSG("CommSHMEMPI: cannot receive msg from node " << checknode);
00345     ERRORMSG(", tag " << checktag << endl);
00346 
00347     if (MAX_SHMEMPI_ERRS > 0 && ++numErrors > MAX_SHMEMPI_ERRS) {
00348       ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
00349       ERRORMSG(") exceeded. MPI is hosed!!" << endl);
00350       Ippl::abort();
00351     }
00352   }
00353 
00354   // if the message is actually available, see if we can get it now
00355   if (flag == true) {
00356     MPI_Get_count(&stat,MPI_BYTE,&size);
00357     if (size < 0) {
00358       ERRORMSG("CommSHMEMPI: received message has size " << size << endl);
00359     } else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0)) {
00360       ERRORMSG("CommSHMEMPI: received message with invalid tag ");
00361       ERRORMSG(stat.MPI_TAG << endl);
00362     } else if (stat.MPI_SOURCE < 0) {
00363       ERRORMSG("CommSHMEMPI: received message from invalid source ");
00364       ERRORMSG(stat.MPI_SOURCE << endl);
00365     } else {
00366       checknode = stat.MPI_SOURCE;
00367       checktag = stat.MPI_TAG;
00368 
00369       // malloc the receive buffer
00370       void *outbuff = makebuffer(size);
00371 
00372       // blocking receive
00373       MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
00374                MPI_COMM_WORLD, &stat);
00375       TAU_TRACE_RECVMSG(checktag, checknode, size);
00376       newmsg = unpack_message(node, tag, outbuff);
00377 
00378         // tell this new Message that we were the one that created its
00379         // storage buffer, so that when the Messageis deleted, we can
00380         // be told about it in order to free the storage.
00381       newmsg->useCommunicate(this, outbuff);
00382       numErrors = 0;
00383     }
00384   } else {
00385     // no message is available
00386     DEBUGMSG(level2 << "CommSHMEMPI: No Message Received to Match Request");
00387     DEBUGMSG(endl);
00388   }
00389 
00390   // return the new Message, or NULL if no message available
00391   return newmsg;
00392 }
00393 
00394 
00396 // Synchronize all processors (everybody waits for everybody
00397 // else to get here before returning to calling function).
00398 // Uses MPI barrier for all procs 
00399 void CommSHMEMPI::mybarrier(void) {
00400   TAU_PROFILE("CommSHMEMPI::mybarrier()", "void ()", TAU_MESSAGE);
00401 
00402   MPI_Barrier(MPI_COMM_WORLD);
00403 }
00404 
00405 
00407 // clean up after a Message has been used (called by Message).
00408 void CommSHMEMPI::cleanupMessage(void *d) {
00409   TAU_PROFILE("CommSHMEMPI::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00410 
00411   // need to free the allocated storage
00412   freebuffer(d);
00413 }
00414 
00415 
00416 /***************************************************************************
00417  * $RCSfile: CommSHMEMPI.cpp,v $   $Author: adelmann $
00418  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00419  * IPPL_VERSION_ID: $Id: CommSHMEMPI.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00420  ***************************************************************************/

Generated on Mon Jan 16 13:23:51 2006 for IPPL by  doxygen 1.4.6