src/Message/CommMPI.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 // include files
00027 #include "Message/CommMPI.h"
00028 #include "Message/Message.h"
00029 #include "Utility/IpplInfo.h"
00030 #include "Utility/PAssert.h"
00031 #include "Profile/Profiler.h"
00032 
00033 // include mpi header file
00034 #include <mpi.h>
00035 
00036 #include <string.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 
00040 
00041 // if an error occurs during myreceive more times than this, CommMPI
00042 // will just exit.  Make it negative to totally disable checking for a
00043 // maximum number of errors
00044 #define MAX_MPI_ERRS    500
00045 
00046 
00047 // static data to keep track of errors
00048 static int numErrors = 0;
00049 #if defined(IPPL_MPIXX)
00050 static MPI_Aint size_of_MPI_INT; /* needed for tracing */
00051 #else
00052 static int size_of_MPI_INT; /* needed for tracing */
00053 #endif
00054 
00055 // temporary buffer used for speed
00056 #define PSIZE 1024*16
00057 #define PACKSIZE ((PSIZE)*sizeof(long))
00058 static long mpipackbuf[PSIZE];
00059 
00060 
00061 
00063 // constructor.  arguments: command-line args, and number of processes
00064 // to start (if < 0, start the 'default' number, i.e. the number of
00065 // hosts in a MPI virtual machine, the number of nodes in an O2K, etc)
00066 // Note: The base-class constructor does not need the argument info or
00067 // the number of nodes, it just by default sets the number of nodes=1
00068 // The final argument indicates whether to run MPI_Init or not; IPPL
00069 // may be run as another user of MPI, in a context where MPI_Init has
00070 // already been called, in which case it can skip that step.
00071 CommMPI::CommMPI(int& argc , char**& argv, int procs, bool mpiinit)
00072   : Communicate(argc, argv, procs), weInitialized(mpiinit)
00073 {
00074   TAU_PROFILE("CommMPI::CommMPI()", "void (int, char **, int)", TAU_MESSAGE);
00075   int i, reported, rep_host, ierror, result_len;
00076   MPI_Status stat;
00077   char *currtok, *nexttok, *execname;
00078 
00079   // a little "string magic" to strip the absolute pathname off the executable
00080   currtok = strstr(argv[0],"/");
00081   if (!currtok) {
00082     execname = strdup(argv[0]);
00083   } else {
00084     currtok++;
00085     nexttok = strstr(currtok,"/");
00086     while (nexttok) {
00087       currtok = nexttok+1;
00088       nexttok = strstr(currtok,"/");
00089     }
00090     execname = strdup(currtok);
00091   }
00092 
00093   // initialize mpi
00094   if (weInitialized)
00095     MPI_Init(&argc, &argv);
00096 
00097   // restore original executable name without absolute path
00098   strcpy(argv[0],execname);
00099 
00100   // duplicate the MPI_COMM_WORLD communicator, so that we can use
00101   // a communicator that will not conflict with other users of MPI_COMM_WORLD
00102   MPI_Comm_dup(MPI_COMM_WORLD, &communicator);
00103 
00104   // determine the number of nodes running and my node number
00105   MPI_Comm_size(communicator,&TotalNodes);
00106   MPI_Comm_rank(communicator,&myHost);
00107 
00108   // make sure we do not have too many processes running
00109   if (procs > 0 && procs < TotalNodes) {
00110     // if this is a process that is beyond what we had requested, just exit
00111     if (myHost >= procs)
00112       Ippl::abort();
00113     TotalNodes = procs;
00114   }
00115 
00116   MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
00117   if (myHost == 0) {    // this code is run by the master process
00118     // send a messages to each child node
00119     for (i = 1; i < TotalNodes; i++)
00120     {
00121       MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, communicator);
00122       TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, i, 1*size_of_MPI_INT);
00123     }
00124 
00125     // wait for the spawned processes to report back that they're ready
00126     int *child_ready = new int[TotalNodes];
00127     for (i = 0; i < TotalNodes; child_ready[i++] = 0);
00128     INFOMSG("CommMPI: Parent process waiting for children ..." << endl);
00129     reported = 1;               // since the parent is already ready
00130     while (reported < TotalNodes) {
00131       ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
00132           COMM_HOSTS_TAG, communicator, &stat);
00133       TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, stat.MPI_SOURCE, 1*size_of_MPI_INT);
00134       if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host])) {
00135          child_ready[rep_host] = 1;
00136          reported++;
00137          INFOMSG("CommMPI: Child " << rep_host << " ready." << endl);
00138        }
00139        else {
00140          ERRORMSG("CommMPI: Error with child reporting to parent.  ");
00141          ERRORMSG("rep_host = " << rep_host);
00142          ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00143        }
00144     }
00145 
00146     delete [] child_ready;
00147     INFOMSG("CommMPI: Initialization complete." << endl);
00148 
00149   }
00150   else {                        // this is a child process; get data from pops
00151     char host_name[MPI_MAX_PROCESSOR_NAME];
00152     ierror = MPI_Get_processor_name(host_name, &result_len);
00153     if (ierror >= 0) {
00154       INFOMSG("CommMPI: Started job " << myHost << " on host `");
00155       INFOMSG(host_name <<  "'." << endl);
00156     }
00157     else {
00158       ERRORMSG("CommMPI: failed" << endl);
00159     }
00160 
00161     // receive message from the master node
00162     int checknode;
00163     MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator,
00164              &stat);
00165     TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00166     if (checknode != 0)
00167       WARNMSG("CommMPI: Child received bad message during startup." << endl);
00168 
00169     // send back an acknowledgement
00170     MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator);
00171     TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00172   }
00173 
00174   // set up the contexts and processes arrays properly
00175   if (TotalNodes > 1) {
00176     vector<int> proccount;
00177     proccount.push_back(1);
00178     for (i = 1; i < TotalNodes; i++) {
00179       Contexts.push_back(1);
00180       Processes.push_back(proccount);
00181     }
00182   }
00183 
00184 }
00185 
00186 
00188 // class destructor
00189 CommMPI::~CommMPI(void)
00190 {
00191   TAU_PROFILE("CommMPI::~CommMPI()", "void()", TAU_MESSAGE);
00192   int i, dieCode = 0;
00193   MPI_Status stat;
00194 
00195   // on all nodes, when running in parallel, get any extra messages not
00196   // yet received
00197   if (TotalNodes > 1) {
00198     int trial, node, tag;
00199     Message *msg;
00200     for (trial = 0; trial < 50000; ++trial) {
00201       do {
00202         node = COMM_ANY_NODE;
00203         tag = COMM_ANY_TAG;
00204         msg = myreceive(node, tag, COMM_SEND_TAG);
00205         if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG) {
00206           WARNMSG("CommMPI: Found extra message from node " << node);
00207           WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00208         }
00209       } while (msg != 0);
00210     }
00211   }
00212 
00213   // broadcast a message to all other nodes to tell them to quit
00214   if (myNode() == 0) {
00215     // on master node, send out messages
00216     for (i = 1; i < TotalNodes; i++)
00217     {
00218       MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, communicator);
00219       TAU_TRACE_SENDMSG(COMM_DIE_TAG, i, 1*size_of_MPI_INT);
00220     }
00221   } else {
00222     // on client nodes, receive message
00223     MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, communicator, &stat);
00224     TAU_TRACE_RECVMSG(COMM_DIE_TAG, 0, 1*size_of_MPI_INT);
00225   }
00226 
00227   // delete the communicator we used
00228   MPI_Comm_free(&communicator);
00229   // if we did our own initialization, also do finalize operation.  But
00230   // if we did not initialize, skip the finalize as well
00231   
00232   /* 
00233      ada: it make no sense to call finalize from tha application,
00234      because the IPPL destructor is calles AFTER that, which causes:
00235      "0032-151 MPI is already finalized in string, task number" 
00236   */
00237   // if (weInitialized)
00238   MPI_Finalize();
00239 }
00240 
00241 
00243 // take the data from a Message object and pack it into the current send buf.
00244 // each message is packed in this order:
00245 //      tag, sending node, number of items             (3-int array)
00246 //              type of item 1  (short)
00247 //              size of item 1, in number of elements   (int)
00248 //              item 1 data     (various)
00249 //              ...
00250 //              type of item N  (short)
00251 //              size of item N, in number of elements   (int)
00252 //              item N data     (various)
00253 void *CommMPI::pack_message(Message *msg, int tag, int &buffsize, int node) {
00254   TAU_PROFILE("CommMPI::pack_message()", "(Message *, int, int, int)",
00255               TAU_MESSAGE);
00256   // calculate size of buffer
00257   buffsize = find_msg_length(*msg);
00258 
00259   // allocate storage for buffer
00260   void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
00261 
00262   // pack message data and return the necessary pointer
00263   fill_msg_buffer(pos, *msg, tag, buffsize, node);
00264   return pos;
00265 }
00266 
00267 
00269 // send a message ... arguments are the Message itself, the
00270 // destination node, the 'user' tag, and the 'encoding' tag.
00271 // Messages should be sent via the underlying mechanism by using the
00272 // encoding tag (one of the COMM_ tags),
00273 // and should embed the information about what the user
00274 // tag is in the data sent between nodes.  Return success.
00275 bool CommMPI::mysend(Message *msg, int node, int tag, int etag) {
00276   TAU_PROFILE("CommMPI::mysend()", "bool (Message *, int, int, int)",
00277               TAU_MESSAGE);
00278 
00279   int nitems = msg->size(); 
00280   int errstat = (-1);
00281   int flag = false;
00282   MPI_Request request;
00283   MPI_Status status;
00284 
00285   MPI_Status rec_status;
00286   int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
00287   Message* newmsg = NULL;
00288 
00289   // pack the message data into the buffer
00290   int size;
00291   void *outbuffer = pack_message(msg, tag, size, node);
00292 
00293   // send the message (non-blocking)
00294   Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
00295   //dbgmsg << "Sending MPI message of size " << size << " to node " << node;
00296   //dbgmsg << " with tag " << tag << "." << endl;
00297   errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
00298                       communicator, &request);
00299   TAU_TRACE_SENDMSG(etag, node, size);
00300 
00301   while (!flag) {
00302     if (!Ippl::retransmit()) {
00303       // get info about messages to be received
00304       bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, communicator,
00305                          &rec_flag, &rec_status);
00306       if ((bufid >= 0) && (rec_flag != 0) ) {
00307         // a message is available to be received
00308         src_node = rec_status.MPI_SOURCE;
00309         rec_tag = rec_status.MPI_TAG;
00310         MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
00311         //dbgmsg<<"Receiving MPI message of size " << rec_size << " from node ";
00312         //dbgmsg << src_node << "." << endl;
00313         if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) ) {
00314           // message is a valid one, so malloc the output buffer
00315           void *rec_buff = makebuffer(rec_size);
00316 
00317           // blocking receive, unpack message
00318           MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
00319                    communicator, &rec_status);
00320           TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00321           newmsg = unpack_message(rec_node, rec_utag, rec_buff);
00322 
00323           // if there was an error unpacking, then the message had a problem
00324           // and is invalid, so throw this one away
00325           if (newmsg == 0) {
00326             // free up the buffer
00327             cleanupMessage(rec_buff);
00328 
00329           } else {
00330             // tell the message to inform us when the buffer is finished
00331             newmsg->useCommunicate(this, rec_buff);
00332 
00333             // put message in my message queue
00334             add_msg(newmsg, rec_node, rec_utag);
00335           }
00336 
00337           // reset other receive information
00338           newmsg = NULL; // reset message pointer
00339           rec_flag = 0; // reset receive flag
00340         }
00341       }
00342     }
00343 
00344     // check for completion of send
00345     MPI_Test(&request, &flag, &status);
00346   }
00347 
00348   //  free up the send buffer
00349   if (size > PACKSIZE)
00350     freebuffer(outbuffer);
00351 
00352   // return the success of the operation
00353   return (errstat == 0);
00354 }
00355 
00356 
00358 // receive a message from the given node and user tag.  Return a NEW
00359 // Message object if a message arrives, or NULL if no message available.
00360 // node will be set to the node from which the message was sent.
00361 // tag will be set to the 'user tag' for that message.
00362 // etag is the 'encoding' tag, and must be one of the COMM_ tags. 
00363 // Only message sent via the underlying mechanism with the
00364 // given etag are checked.  When one is found, the user tag and sending
00365 // node are extracted from the sent data.
00366 // If node = COMM_ANY_NODE, checks for messages from any node.
00367 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
00368 Message *CommMPI::myreceive(int& node, int& tag, int etag) {
00369   TAU_PROFILE("CommMPI::myreceive()", "Message *(int, int, int)", TAU_MESSAGE);
00370   int bufid, size, checknode, checktag, flag = false;
00371   Message *newmsg = 0;
00372   MPI_Status stat;
00373 
00374   checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
00375   checktag = etag;
00376 
00377   // get info about message
00378   bufid = MPI_Iprobe(checknode, checktag, communicator, &flag, &stat);
00379   if (bufid < 0) {
00380     // an error has occurred
00381     ERRORMSG("CommMPI: cannot receive msg from node " << checknode);
00382     ERRORMSG(", tag " << checktag << endl);
00383 
00384     if (MAX_MPI_ERRS > 0 && ++numErrors > MAX_MPI_ERRS) {
00385       ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
00386       ERRORMSG(") exceeded. MPI is hosed!!" << endl);
00387       Ippl::abort();
00388     }
00389   }
00390 
00391   // if the message is actually available, see if we can get it now
00392   if (flag == true) {
00393     MPI_Get_count(&stat,MPI_BYTE,&size);
00394     if (size < 0) {
00395       ERRORMSG("CommMPI: received message has size " << size << endl);
00396     } else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0)) {
00397       ERRORMSG("CommMPI: received message with invalid tag ");
00398       ERRORMSG(stat.MPI_TAG << endl);
00399     } else if (stat.MPI_SOURCE < 0) {
00400       ERRORMSG("CommMPI: received message from invalid source ");
00401       ERRORMSG(stat.MPI_SOURCE << endl);
00402     } else {
00403       checknode = stat.MPI_SOURCE;
00404       checktag = stat.MPI_TAG;
00405 
00406       // malloc the receive buffer
00407       void *outbuff = makebuffer(size);
00408 
00409       // blocking receive
00410       //Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
00411       //dbgmsg << "Receiving MPI message of size " << size << " from node ";
00412       //dbgmsg << checknode << "." << endl;
00413       MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
00414                communicator, &stat);
00415       TAU_TRACE_RECVMSG(checktag, checknode, size);
00416       newmsg = unpack_message(node, tag, outbuff);
00417 
00418       // if there was an error unpacking, then the message had a problem
00419       // and is invalid, so throw this one away
00420       if (newmsg == 0) {
00421         // free up the buffer
00422         cleanupMessage(outbuff);
00423       } else {
00424         // tell the message to notify us when its done with the buffer
00425         newmsg->useCommunicate(this, outbuff);
00426       }
00427 
00428       // zero out the count of MPI-specific errors
00429       numErrors = 0;
00430     }
00431 
00432   } else {
00433     // no message is available
00434     DEBUGMSG(level2<<"CommMPI: No Message Received to Match Request"<<endl);
00435   }
00436 
00437   // return the new Message, or NULL if no message available
00438   return newmsg;
00439 }
00440 
00441 
00443 // Synchronize all processors (everybody waits for everybody
00444 // else to get here before returning to calling function).
00445 // Uses MPI barrier for all procs 
00446 void CommMPI::mybarrier(void) {
00447   TAU_PROFILE("CommMPI::mybarrier()", "void ()", TAU_MESSAGE);
00448 
00449   MPI_Barrier(communicator);
00450 }
00451 
00452 
00454 // resent a message buffer that has been previously packed and copied
00455 // into the provided buffer.  Return success.
00456 bool CommMPI::resend(void *buf, int buffsize, int node, int etag) {
00457   TAU_PROFILE("CommMPI::resend()", "void (void *, int, int, int)",
00458               TAU_MESSAGE);
00459 
00460   //Inform dbgmsg("CommMPI::resend", INFORM_ALL_NODES);
00461   //dbgmsg << "About to resend buffer of size " << buffsize << " to node ";
00462   //dbgmsg << node << " with etag = " << etag << endl;
00463 
00464   // this will only work if we're sending to another node
00465   PInsist(node != myNode(), "Can only retransmit to other nodes");
00466 
00467   // send the buffer out
00468   MPI_Request request;
00469   int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
00470                           communicator, &request);
00471   TAU_TRACE_SENDMSG(etag, node, size);
00472 
00473   int flag = false;
00474   MPI_Status status;
00475   while (!flag) {
00476     // check for completion of send
00477     MPI_Test(&request, &flag, &status);
00478   }
00479 
00480   // return the success of the operation
00481   return (errstat == 0);
00482 }
00483  
00484 
00486 // clean up after a Message has been used (called by Message).
00487 void CommMPI::cleanupMessage(void *d) {
00488   TAU_PROFILE("CommMPI::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00489 
00490   // need to free the allocated storage
00491   freebuffer(d);
00492 }
00493 
00494 
00495 /***************************************************************************
00496  * $RCSfile: CommMPI.cpp,v $   $Author: adelmann $
00497  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00498  * IPPL_VERSION_ID: $Id: CommMPI.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00499  ***************************************************************************/

Generated on Mon Jan 16 13:23:51 2006 for IPPL by  doxygen 1.4.6