src/Message/CommPVM.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 /*
00027  * CommPVM.cpp - member function for PVM-specific Communicate object,
00028  * which handles node-to-node communication in a parallel environment
00029  * using the PVM library.
00030  */
00031 
00032 // include files
00033 #include "Message/CommPVM.h"
00034 #include "Message/Message.h"
00035 #include "Utility/IpplInfo.h"
00036 #include "Profile/Profiler.h"
00037 
00038 // include pvm header file
00039 #include <pvm3.h>
00040 
00041 #include <string.h>
00042 #include <stdlib.h>
00043 #include <unistd.h>
00044 
00045 
00046 // if an error occurs during myreceive more times than this, CommPVM
00047 // will just exit.  Make it negative to totally disable checking for a
00048 // maximum number of errors
00049 #define MAX_PVM_ERRS    500
00050 
00051 
00052 // typedef used in allocating an array of char pointers
00053 typedef char *charptr;
00054 
00055 // static data to keep track of errors
00056 static int numErrors = 0;
00057 
00058 
00060 // constructor.  Starts up the virtual machine processes.
00061 //      arguments: command-line args, and number of processes
00062 // to start (if < 0, start the 'default' number, i.e. the number of
00063 // hosts in a PVM virtual machine, the number of nodes in an O2K, etc)
00064 // Note: The base-class constructor does not need the argument info or
00065 // the number of nodes, it just by default sets the number of nodes=1
00066 CommPVM::CommPVM(int argc , char** argv, int procs)
00067   : Communicate(argc, argv, procs) {
00068   TAU_PROFILE("CommPVM::CommPVM()", "void (int, char **, int)", TAU_MESSAGE);
00069   charptr *useargs = argv;
00070 
00071   // PVM specific stuff...
00072   int i, startNum, mytid, parent_tid, testnumhosts, added;
00073   int *child_ready, reported, rep_host, rep_tid;
00074   charptr *spawnargv = 0;
00075 
00076   // initialize useful info
00077   execName = NULL;
00078   tids = NULL;
00079 
00080   // Following group enrollment of everybody needed for barrier implement.:
00081   int ErrRet = pvm_joingroup("all_tasks");
00082   if (ErrRet==PvmSysErr) {
00083     ERRORMSG("CommPVM: pvmd not running!!" << endl);
00084   }
00085   else if (ErrRet==PvmBadParam) {
00086     ERRORMSG("CommPVM: bad group name!!" << endl);
00087   }
00088   else if (ErrRet==PvmDupGroup) {
00089     ERRORMSG("CommPVM: already joined this group!!" << endl);
00090   }
00091 
00092   // determine my tid number, and if this process is the parent process
00093   mytid = pvm_mytid();
00094   if(mytid >= 0) {
00095     // PVM is indeed running, get info about the virtual machine.
00096 
00097     //  Account for old PVM version on Exemplar
00098     struct pvmhostinfo *hostp;
00099     struct pvmtaskinfo *taskp;
00100     parent_tid = pvm_parent();
00101     pvm_setopt(PvmRoute, PvmRouteDirect);
00102 
00103     // get information about the virtual machine.
00104     // used to set number of hosts, and to spawn processes if this is the
00105     // master node.
00106     int narch;
00107     if(pvm_config(&TotalNodes, &narch, &hostp) != 0) {
00108       // error getting config info; go back to being single-node machine
00109       WARNMSG("CommPVM: cannot get virtual machine configuration.\n");
00110       WARNMSG("         Going to single-node status." << endl);
00111       TotalNodes = 1;
00112       parent_tid = (-1);
00113     }
00114 
00115     // print out the arguments being used, and make copy of executable name
00116     char *exnmptr = strrchr(useargs[0],'/');
00117     if(!exnmptr)
00118       exnmptr = useargs[0];
00119     else
00120       exnmptr++;
00121     execName = new char[strlen(exnmptr) + 1];
00122     strcpy(execName,exnmptr);
00123 
00124     // determine the proper number of processes to start.  We usually start
00125     // one process per node.  If there is just one node, and proc > 1, we
00126     // start 'proc' proceses on the same node.  Finally, if proc > 1 but
00127     // less than the total number of nodes, we start processes on just 'proc'
00128     // nodes (for some arch's, this may mean letting some nodes sit idle)
00129     if(procs > 0)
00130       startNum = procs;
00131     else
00132       startNum = TotalNodes;
00133 
00134     // make sure, if we are only starting one node
00135     if(startNum == 1)
00136       TotalNodes = 1;
00137 
00138     // if this is the first PVM communication object, add hosts to system
00139     tids = new int[startNum];
00140     if (parent_tid < 0) {
00141       // this is the parent process ... spawn others if necessary
00142       tids[0] = mytid;
00143       added = 1;
00144       myHost = 0;
00145       if(startNum > 1) {
00146         // For a 'normal' PVM setup, we start a job on each host, unless
00147         // we are starting up all the processes on the same node
00148 
00149         // first get the argument list
00150         if (argc > 1) {
00151           spawnargv = new charptr[argc];
00152           for(i=0; i < (argc - 1); i++)
00153             spawnargv[i] = useargs[i+1];
00154           spawnargv[argc - 1] = NULL;
00155         }
00156 
00157         // now start a job on each host (may not use ALL the hosts ...)
00158         // find the parent pvmd tid, so we do not start another job there
00159         int ntask;
00160         pvm_tasks(0, &ntask, &taskp);
00161         int pvmd_tid = (taskp[0]).ti_host;
00162         int checked_host_once = 0;
00163 
00164         // go to each host and spawn a process; do this round-robin
00165         int pn;
00166         for (i=0, pn=0; pn < startNum && added < startNum; pn++) {
00167           if (checked_host_once == 0 && pvmd_tid == (hostp[i]).hi_tid) {
00168             checked_host_once = 1;
00169           }
00170           else {
00171             INFOMSG("CommPVM: Starting job " << added + 1 << " on host '");
00172             INFOMSG((hostp[i]).hi_name << "' ... ");
00173             pvm_spawn(execName, spawnargv, PvmTaskHost,
00174                       (hostp[i]).hi_name, 1, tids + added);
00175             if(tids[added] < 0) {
00176               ERRORMSG("failed.  PVM Error = " << tids[added] << endl);
00177             }
00178             else {
00179               INFOMSG("new tid = " << tids[added] << endl);
00180               added++;
00181             }
00182           }
00183           i = ((i + 1) % TotalNodes);
00184         }
00185 
00186         // free storage used for sending out arguments
00187         delete [] spawnargv;
00188 
00189         // adjust TotalNodes to the actual number of hosts which responded
00190         TotalNodes = startNum = added;
00191 
00192         pvm_initsend(PvmDataRaw);
00193         pvm_pkint(&TotalNodes,1,1);
00194         pvm_pkint(tids,TotalNodes,1);
00195         pvm_mcast(tids+1,TotalNodes-1,COMM_HOSTS_TAG);
00196 
00197         pvm_initsend(PvmDataRaw);
00198 
00199         // wait for the spawned processes to report back that they're ready
00200         INFOMSG("CommPVM: Parent process waiting for children ..." << endl);
00201         child_ready = new int[TotalNodes];
00202         for(i=0; i < TotalNodes; child_ready[i++] = 0);
00203         reported = 1;           // since the parent is already ready
00204         while(reported < TotalNodes) {
00205           if(pvm_nrecv((-1),COMM_HOSTS_TAG) > 0) {
00206             pvm_upkint(&rep_host,1,1);
00207             pvm_upkint(&rep_tid,1,1);
00208             if(rep_host>=0 &&rep_host<TotalNodes &&!(child_ready[rep_host])) {
00209               child_ready[rep_host] = 1;
00210               reported++;
00211               INFOMSG("CommPVM: Child " << rep_host << " (tid = " << rep_tid);
00212               INFOMSG(") ready." << endl);
00213             }
00214             else {
00215               ERRORMSG("CommPVM: Error with child reporting to parent.  ");
00216               ERRORMSG("rep_host = " << rep_host << ", rep_tid = " << rep_tid);
00217               ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00218             }
00219           }
00220         }
00221         delete [] child_ready;
00222         INFOMSG("CommPVM: Initialization complete!" << endl);
00223       }
00224 
00225     }
00226     else {              // this is a child process; get data from pops
00227       // get message from parent
00228       pvm_recv(parent_tid, COMM_HOSTS_TAG);
00229       pvm_upkint(&testnumhosts,1,1);
00230       if(testnumhosts != startNum) {
00231         WARNMSG("CommPVM: For tid " << mytid);
00232         WARNMSG(": Adjusting # of nodes from " << startNum << " to ");
00233         WARNMSG(testnumhosts << endl);
00234         startNum = testnumhosts;
00235       }
00236       pvm_upkint(tids, startNum, 1);
00237       TotalNodes = startNum;
00238 
00239       // find my host index
00240       for(i=1; i < startNum; i++)
00241         if(mytid == tids[i]) { myHost = i; break; }
00242 
00243       // check to see if we still need to be executing
00244       if(i >= startNum) {
00245         // this is a node which is not needed, so just exit for now
00246         pvm_exit();
00247       }
00248 
00249       // reply back to the parent we're ready to go
00250       pvm_initsend(PvmDataRaw);
00251       pvm_pkint(&myHost,1,1);
00252       pvm_pkint(&mytid,1,1);
00253       pvm_send(parent_tid,COMM_HOSTS_TAG);
00254     }
00255 
00256   }
00257   else {                // PVM is not running; make this a single node
00258     TotalNodes = 1;
00259     WARNMSG("CommPVM: PVM not running, going to single-node mode." << endl);
00260   }
00261 
00262   // set up the contexts and processes arrays properly
00263   if (TotalNodes > 1) {
00264     vector<int> proccount;
00265     proccount.push_back(1);
00266     for (i = 1; i < TotalNodes; i++) {
00267       Contexts.push_back(1);
00268       Processes.push_back(proccount);
00269     }
00270   }
00271 }
00272 
00273 
00275 // class destructor
00276 CommPVM::~CommPVM(void) {
00277   TAU_PROFILE("CommPVM::~CommPVM()", "void ()", TAU_MESSAGE);
00278 
00279   int i;                        // loop variable
00280 
00281   // on all nodes, when running in parallel, get any extra messages not
00282   // yet received
00283   if (TotalNodes > 1) {
00284     int trial, node, tag;
00285     Message *msg;
00286     for (trial = 0; trial < 50000; ++trial) {
00287       do {
00288         node = COMM_ANY_NODE;
00289         tag = COMM_ANY_TAG;
00290         msg = myreceive(node, tag, COMM_SEND_TAG);
00291         if (msg != 0) {
00292           WARNMSG("CommPVM: Found extra message from node " << node);
00293           WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00294         }
00295       } while (msg != 0);
00296     }
00297   }
00298 
00299   // if this is the last active CommunicatePVM object, clean up
00300   if (TotalNodes > 1) {
00301     // perform barrier to get all nodes ready to die
00302     mybarrier();
00303 
00304     // if this is the parent process, kill others
00305     if (myHost == 0) {
00306       INFOMSG("CommPVM: Parent process killing spawned processes ..." << endl);
00307       for (i = 1 ; i < TotalNodes; i++)
00308         pvm_kill(tids[i]);
00309 
00310       pvm_exit();               // and then exit the parent from pvm
00311       if(tids)
00312         delete [] tids;
00313       if(execName)
00314         delete [] execName;
00315 
00316     } else {
00317       // on client nodes, just wait to die
00318       if(tids)
00319         delete [] tids;
00320       if(execName)
00321         delete [] execName;
00322 
00323       while(1)
00324         sleep(1);               // just sit and wait for the end
00325     }
00326   }
00327 }
00328 
00329 
00331 // take the data from a Message object and pack it into the current send buf.
00332 // each message is packed in this order:
00333 //      tag, sending node, number of items             (3-int array)
00334 //              type of item 1  (short)
00335 //              size of item 1, in number of elements   (int)
00336 //              item 1 data     (various)
00337 //              ...
00338 //              type of item N  (short)
00339 //              size of item N, in number of elements   (int)
00340 //              item N data     (various)
00341 void CommPVM::pack_message(Message *msg, int tag) {
00342   TAU_PROFILE("CommPVM::pack_message()", "void (Message *, int)", TAU_MESSAGE);
00343   int mdata[3];                 //  array to store msg header info
00344   int nitems = msg->size();     //  Number of items in Message
00345 
00346   // put message header info into the buffer
00347   mdata[0] = tag;
00348   mdata[1] = myNode();
00349   mdata[2] = nitems;
00350   pvm_pkint(mdata, 3, 1);        // tag, sender, number of items
00351  
00352   // scan through the message items, and pack their data
00353   Message::iterator mbeg = msg->begin();
00354   Message::iterator mend = msg->end();
00355   for ( ; mbeg != mend; ++mbeg) {
00356     Message::MsgItem *msg = *mbeg;
00357     mdata[0] = msg->elemsize;
00358     mdata[1] = msg->bytesize;
00359     pvm_pkint(mdata, 2, 1);
00360     if (mdata[1] > 0)
00361       pvm_pkbyte((char *)(msg->item), mdata[1], 1);
00362   }
00363 }
00364 
00365 
00367 // unpack the current receive message into a Message,
00368 // and look for a message with the given tag.  If a message is found which
00369 // does not match the search criteria, it is put in the received queue.
00370 // messages are in the format described above for 'pack_message'.
00371 Message* CommPVM::unpack_message(int &node, int &tag) {
00372   TAU_PROFILE("CommPVM::unpack_message()", "Message * (int, int)", TAU_MESSAGE);
00373   int mdata[3];                 // array to store msg header info
00374   int j, nitems;                // loop variable, and # items in msg
00375 
00376   // get the tag, sender, and number of messages
00377   pvm_upkint(mdata, 3, 1);
00378   tag = mdata[0];
00379   node = mdata[1];
00380   nitems = mdata[2];
00381 
00382   // create data structure for this message
00383   Message *newmsg = new Message;
00384 
00385   // get all the items and add to the message
00386   for (j = 0; j < nitems; j++) {
00387     pvm_upkint(mdata, 2, 1);
00388     int elemsize = mdata[0];
00389     int bytesize = mdata[1];
00390 
00391     // for each item, allocate storage, and give to Message object as
00392     // storage that should NOT be copied, and that SHOULD be freed up
00393     // when the Message is deleted.
00394     if ( bytesize > 0  && elemsize > 0 ) {
00395       //      char *itemdata = new char[bytesize];
00396       // use malloc instead
00397       char* itemdata = (char*) malloc(bytesize);
00398       pvm_upkbyte(itemdata, bytesize, 1);
00399       newmsg->setCopy(false);
00400       newmsg->setDelete(true);
00401       newmsg->putmsg((void *)itemdata, elemsize, bytesize / elemsize );
00402     }
00403   }
00404 
00405   // return the new message
00406   return newmsg;
00407 }
00408 
00409 
00411 // send a message ... arguments are the Message itself, the
00412 // destination node, the 'user' tag, and the 'encoding' tag.
00413 // Messages should be sent via the underlying mechanism by using the
00414 // encoding tag (one of the COMM_ tags),
00415 // and should embed the information about what the user
00416 // tag is in the data sent between nodes.  Return success.
00417 bool CommPVM::mysend(Message *msg, int node, int tag, int etag) {
00418   TAU_PROFILE("CommPVM::mysend()", "bool (Message *, int, int, int)", TAU_MESSAGE);
00419   int newbufid, errstat = (-1);
00420 
00421   // create a new buffer for the message
00422   newbufid = pvm_initsend(PvmDataRaw);
00423   if ( newbufid >= 0 )    {
00424     // pack the message data into the buffer
00425     pack_message(msg, tag);
00426 
00427     // send the message
00428     errstat = pvm_send(tids[node], etag);
00429   }
00430   else {
00431     ERRORMSG("CommPVM: Error in pvm_initsend = " << newbufid << endl);
00432     return false;
00433   }
00434 
00435   // return the success of the operation
00436   return (errstat == 0);
00437 }
00438 
00439 
00441 // receive a message from the given node and user tag.  Return a NEW
00442 // Message object if a message arrives, or NULL if no message available.
00443 // node will be set to the node from which the message was sent.
00444 // tag will be set to the 'user tag' for that message.
00445 // etag is the 'encoding' tag, and must be one of the COMM_ tags. 
00446 // Only message sent via the underlying mechanism with the
00447 // given etag are checked.  When one is found, the user tag and sending
00448 // node are extracted from the sent data.
00449 // If node = COMM_ANY_NODE, checks for messages from any node.
00450 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
00451 Message *CommPVM::myreceive(int& node, int& tag, int etag) {
00452   TAU_PROFILE("CommPVM::myreceive()", "Message * (int, int, int)", TAU_MESSAGE);
00453   int bufid, size, checknode, checktag;
00454   Message *newmsg = NULL;
00455 
00456   checknode = (node < 0 || node >= TotalNodes ? (-1) : tids[node]);
00457   checktag = etag;
00458 
00459   // clear out any current message
00460   if ((bufid = pvm_getrbuf()) > 0)
00461     pvm_freebuf(bufid);
00462 
00463   // get the message, if available
00464   bufid = pvm_probe(checknode, checktag);
00465 
00466   // check if there really was a message
00467   if (bufid > 0) {
00468 
00469     // yes there was .. get info about message
00470     if (pvm_bufinfo(bufid, &size, &checktag, &checknode) < 0) {
00471       ERRORMSG("CommPVM: myreceive shows msg, but ");
00472       ERRORMSG("cannot get info about this buffer." << endl);
00473       ERRORMSG("   id = " << bufid << endl);
00474     }
00475     else if (size <= 0) {
00476       ERRORMSG("CommPVM: received message has size " << size << endl);
00477     }
00478     else {
00479       // we have a valid message.  break it up into Messages,
00480       // and look for a message with a matching tag.
00481       bufid = pvm_recv(checknode, checktag);
00482       newmsg = unpack_message(node, tag);
00483       pvm_freebuf(bufid);
00484       numErrors = 0;
00485     }
00486 
00487   }
00488   else if (bufid < 0) {
00489     // an error has occurred
00490     ERRORMSG("CommPVM: myreceive cannot receive msg from node " << node);
00491     ERRORMSG(", tag " << tag << endl);
00492 
00493     if (MAX_PVM_ERRS > 0 && ++numErrors > MAX_PVM_ERRS) {
00494       ERRORMSG("Maximum number of PVM receive errors (" << numErrors);
00495       ERRORMSG(") exceeded.  PVM is hosed!!" << endl);
00496       exit(1);
00497     }
00498   }
00499 
00500   // return the message if it was found, or NULL if no message
00501   return newmsg;
00502 }
00503 
00504 
00506 // Synchronize all processors (everybody waits for everybody
00507 // else to get here before returning to calling function).
00508 // Uses PVM barrier for all procs.
00509 void CommPVM::mybarrier(void) {
00510   TAU_PROFILE("CommPVM::mybarrier()", "void ()", TAU_MESSAGE);
00511   pvm_barrier("all_tasks", TotalNodes);
00512   return;
00513 }
00514 /***************************************************************************
00515  * $RCSfile: CommPVM.cpp,v $   $Author: adelmann $
00516  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00517  * IPPL_VERSION_ID: $Id: CommPVM.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00518  ***************************************************************************/

Generated on Mon Jan 16 13:23:51 2006 for IPPL by  doxygen 1.4.6