src/Message/CommPM.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 // include files
00027 #include "Message/CommPM.h"
00028 #include "Message/Message.h"
00029 #include "Utility/IpplInfo.h"
00030 #include "Profile/Profiler.h"
00031 
00032 // include mpi header file
00033 #include <mpi.h>
00034 
00035 // include PM header file. need to invoke PM library.
00036 // #include <Pm.h>
00037 // define interface directly, instead of using Pm.h.
00038 // (because KCC couldn't parse Pm.h)
00039 // The following definitions are written in Pm.h
00040 typedef void* pmCtx;
00041 extern "C" {
00042   int _pmGetSendBuf(pmCtx pmc, caddr_t *bufp, size_t length);
00043   int _pmSend(pmCtx pmc, int dst_node);
00044   int _pmSendDone(pmCtx pmc);
00045   int _pmReceive(pmCtx pmc, caddr_t *bufp, size_t *length);
00046   int _pmPutReceiveBuf(pmCtx pmc);
00047   extern pmCtx _pm_subnet[];
00048   extern int _pm_subnet_count;
00049 };
00050 #define PM_MTU                  (8192 + 32)
00051 
00052 // include score header file. need to invoke sub-network functions.
00053 #include <score.h>
00054 
00055 #include <string.h>
00056 #include <stdlib.h>
00057 #include <unistd.h>
00058 
00059 // if an error occurs during myreceive more times than this, CommPM
00060 // will just exit.  Make it negative to totally disable checking for a
00061 // maximum number of errors
00062 #define MAX_MPI_ERRS    500
00063 
00064 
00065 // static data to keep track of errors
00066 static int size_of_MPI_INT; /* needed for tracing */
00067 
00068 // sub-network for message passing using PM directly.
00069 static pmCtx pm_network;
00070 
00072 // constructor.  arguments: command-line args, and number of processes
00073 // to start (if < 0, start the 'default' number, i.e. the number of
00074 // hosts in a MPI virtual machine, the number of nodes in an O2K, etc)
00075 // Note: The base-class constructor does not need the argument info or
00076 // the number of nodes, it just by default sets the number of nodes=1
00077 CommPM::CommPM(int& argc , char**& argv, int procs)
00078         : Communicate(argc, argv, procs) {
00079   TAU_PROFILE("CommPM::CommPM()", "void (int, char **, int)", TAU_MESSAGE);
00080   int i, reported, rep_host, ierror, result_len;
00081   MPI_Status stat;
00082   char *currtok, *nexttok, *execname;
00083 
00084   // a little "string magic" to strip the absolute pathname off the executable
00085   currtok = strstr(argv[0],"/");
00086   if (!currtok) {
00087     execname = strdup(argv[0]);
00088   } else {
00089     currtok++;
00090     nexttok = strstr(currtok,"/");
00091     while (nexttok) {
00092       currtok = nexttok+1;
00093       nexttok = strstr(currtok,"/");
00094     }
00095     execname = strdup(currtok);
00096   }
00097 
00098   // allocate a sub-network for message passing using PM directly.
00099   if (_score_alloc_subnet()) {
00100     ERRORMSG("CommPM: Error with allocating sub-network.");
00101   }
00102   pm_network = _pm_subnet[_pm_subnet_count - 1];
00103 
00104   // initialize mpi
00105   MPI_Init(&argc, &argv);
00106 
00107   // restore original executable name without absolute path
00108   strcpy(argv[0],execname);
00109 
00110   // determine the number of nodes running and my node number
00111   MPI_Comm_size(MPI_COMM_WORLD,&TotalNodes);
00112   MPI_Comm_rank(MPI_COMM_WORLD,&myHost);
00113 
00114   // make sure we do not have too many processes running
00115   if (procs > 0 && procs < TotalNodes) {
00116     // if this is a process that is beyond what we had requested, just exit
00117     if (myHost >= procs)
00118       Ippl::abort();
00119     TotalNodes = procs;
00120   }
00121 
00122   MPI_Type_size ( MPI_INT, (MPI_Aint*)&size_of_MPI_INT );
00123   if (myHost == 0) {    // this code is run by the master process
00124     // send a messages to each child node
00125     for (i = 1; i < TotalNodes; i++)
00126     {
00127       MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00128       TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, i, 1*size_of_MPI_INT);
00129     }
00130 
00131     // wait for the spawned processes to report back that they're ready
00132     int *child_ready = new int[TotalNodes];
00133     for (i = 0; i < TotalNodes; child_ready[i++] = 0);
00134     INFOMSG("CommPM: Parent process waiting for children ..." << endl);
00135     reported = 1;               // since the parent is already ready
00136     while (reported < TotalNodes) {
00137       ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
00138           COMM_HOSTS_TAG, MPI_COMM_WORLD, &stat);
00139       TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, stat.MPI_SOURCE, 1*size_of_MPI_INT);
00140       if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host])) {
00141          child_ready[rep_host] = 1;
00142          reported++;
00143          INFOMSG("CommPM: Child " << rep_host << " ready." << endl);
00144        }
00145        else {
00146          ERRORMSG("CommPM: Error with child reporting to parent.  ");
00147          ERRORMSG("rep_host = " << rep_host);
00148          ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00149        }
00150     }
00151 
00152     delete [] child_ready;
00153     INFOMSG("CommPM: Initialization complete." << endl);
00154 
00155   }
00156   else {                        // this is a child process; get data from pops
00157     char host_name[MPI_MAX_PROCESSOR_NAME];
00158     ierror = MPI_Get_processor_name(host_name, &result_len);
00159     if (ierror >= 0) {
00160       INFOMSG("CommPM: Started job " << myHost << " on host `");
00161       INFOMSG(host_name <<  "'." << endl);
00162     }
00163     else {
00164       ERRORMSG("CommPM: failed" << endl);
00165     }
00166 
00167     // receive message from the master node
00168     int checknode;
00169     MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD,
00170              &stat);
00171     TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00172     if (checknode != 0)
00173       WARNMSG("CommPM: Child received bad message during startup." << endl);
00174 
00175     // send back an acknowledgement
00176     MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00177     TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00178   }
00179 
00180   // set up the contexts and processes arrays properly
00181   if (TotalNodes > 1) {
00182     vector<int> proccount;
00183     proccount.push_back(1);
00184     for (i = 1; i < TotalNodes; i++) {
00185       Contexts.push_back(1);
00186       Processes.push_back(proccount);
00187     }
00188   }
00189 
00190 }
00191 
00192 
00194 // class destructor
00195 CommPM::~CommPM(void) {
00196   TAU_PROFILE("CommPM::~CommPM()", "void()", TAU_MESSAGE);
00197   int i, dieCode = 0;
00198   MPI_Status stat;
00199 
00200   // on all nodes, when running in parallel, get any extra messages not
00201   // yet received
00202   if (TotalNodes > 1) {
00203     int trial, node, tag;
00204     Message *msg;
00205     for (trial = 0; trial < 50000; ++trial) {
00206       do {
00207         node = COMM_ANY_NODE;
00208         tag = COMM_ANY_TAG;
00209         msg = myreceive(node, tag, COMM_SEND_TAG);
00210         if (msg != 0) {
00211           WARNMSG("CommPM: Found extra message from node " << node);
00212           WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00213         }
00214       } while (msg != 0);
00215     }
00216   }
00217 
00218   // broadcast a message to all other nodes to tell them to quit
00219   if (myNode() == 0) {
00220     // on master node, send out messages
00221     for (i = 1; i < TotalNodes; i++)
00222     {
00223       MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, MPI_COMM_WORLD);
00224       TAU_TRACE_SENDMSG(COMM_DIE_TAG, i, 1*size_of_MPI_INT);
00225     }
00226   } else {
00227     // on client nodes, receive message
00228     MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
00229     TAU_TRACE_RECVMSG(COMM_DIE_TAG, 0, 1*size_of_MPI_INT);
00230   }
00231 
00232   // release sub-network
00233   _score_free_subnet();
00234 
00235   MPI_Finalize();
00236 }
00237 
00238 
00239 struct PM_Message {
00240   int node;
00241   int tag;
00242 };
00243 
00244 #if 0
00245 
00246 // take the data from a Message object and pack it into the current send buf.
00247 // each message is packed in this order:
00248 //      tag, sending node, number of items             (3-int array)
00249 //              type of item 1  (short)
00250 //              size of item 1, in number of elements   (int)
00251 //              item 1 data     (various)
00252 //              ...
00253 //              type of item N  (short)
00254 //              size of item N, in number of elements   (int)
00255 //              item N data     (various)
00256 void *CommPM::pack_message(Message *msg, int tag, int &buffsize) {
00257   TAU_PROFILE("CommPM::pack_message()", "(Message *, int, int)",
00258               TAU_MESSAGE);
00259   // calculate size of buffer
00260   buffsize = find_msg_length(*msg);
00261 
00262   // allocate storage for buffer
00263   void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
00264 
00265   // pack message data and return the necessary pointer
00266   fill_msg_buffer(pos, *msg, tag);
00267   return pos;
00268 }
00269 #endif
00270 
00272 // send a message ... arguments are the Message itself, the
00273 // destination node, the 'user' tag, and the 'encoding' tag.
00274 // Messages should be sent via the underlying mechanism by using the
00275 // encoding tag (one of the COMM_ tags),
00276 // and should embed the information about what the user
00277 // tag is in the data sent between nodes.  Return success.
00278 bool CommPM::mysend(Message *msg, int node, int tag, int etag) {
00279   TAU_PROFILE("CommPM::mysend()", "bool (Message *, int, int, int)",
00280               TAU_MESSAGE);
00281 
00282   int nerr = 0;
00283 
00284   //  printf("mysend\n");
00285 
00286   // calculate size of buffer
00287   int length;
00288   length = find_msg_length(*msg) + sizeof(PM_Message);
00289 
00290   //  printf("length = %d\n", length);
00291 
00292   // allocate storage for buffer
00293   PM_Message* msgbuf;
00294   int timeout_counter = 0;
00295   while (_pmGetSendBuf(pm_network, (caddr_t*)&msgbuf, length) == ENOBUFS) {
00296     timeout_counter++;
00297     if (timeout_counter > 10000000) {
00298       ERRORMSG("CommPM: _pmGetSendBuf TIMEOUT");
00299       timeout_counter = 0;
00300       nerr++;
00301     }
00302     // receive a message when the buffer couldn't be allocated.
00303     nerr += pickup_message();
00304   }
00305 
00306   //  printf("buff %x is allocated\n", msgbuf);
00307   // pack message
00308   msgbuf->node = myHost;
00309   msgbuf->tag = tag;
00310   //  printf("invoke fill_msg_buffer\n");
00311   fill_msg_buffer((void*) (&(msgbuf->tag) + 1), *msg, tag);
00312   //  printf("fill_msg_buffer done\n");
00313 
00314   // send message
00315   if (_pmSend(pm_network, node)) {
00316     ERRORMSG("CommPM: _pmSend Error");
00317     nerr++;
00318   }
00319   // receive message waiting the sending done.
00320   while (_pmSendDone(pm_network) == EBUSY) {
00321     nerr += pickup_message();
00322   }
00323 
00324   //  printf("mysend done\n");
00325   // return the succsess of the operation
00326   return (nerr == 0);
00327 }
00328 
00329 int CommPM::pickup_message(void) {
00330   int nerr = 0;
00331   int length;
00332   PM_Message* msgbuf;
00333   int error;
00334   Message* newmsg = 0;
00335   // int rec_tag; 
00336   int src_node, rec_size, rec_utag;
00337   void* rec_buff;
00338 
00339   // pickup message
00340   if (error = _pmReceive(pm_network, (caddr_t*)&msgbuf, (size_t*)&length)) {
00341     // no message is received
00342     if (error != ENOBUFS) {
00343       ERRORMSG("CommPM: _pmReceive Error (in pickup_message)");
00344       nerr++;
00345     }
00346   } else {
00347     // a message is received, unpack it
00348     TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00349     src_node = msgbuf->node;
00350     // rec_tag = msgbuf->tag;
00351     rec_size = length - sizeof(PM_Message);
00352     rec_buff = makebuffer(rec_size);
00353     //    bcopy((void*) (&(msgbuf->tag) + 1), rec_buff, rec_size);
00354     memcpy(rec_buff, (void*) (&(msgbuf->tag) + 1), rec_size);
00355     _pmPutReceiveBuf(pm_network);
00356     newmsg = unpack_message(src_node, rec_utag, rec_buff);
00357     newmsg->useCommunicate(this, rec_buff);
00358 
00359     // put message in my message queue
00360     if (add_msg(newmsg, src_node, rec_utag)) {
00361       newmsg = NULL; // reset message pointer
00362     }
00363   }
00364 
00365   // return the number of errors;
00366   return nerr;
00367 }
00368 
00369 
00371 // receive a message from the given node and user tag.  Return a NEW
00372 // Message object if a message arrives, or NULL if no message available.
00373 // node will be set to the node from which the message was sent.
00374 // tag will be set to the 'user tag' for that message.
00375 // etag is the 'encoding' tag, and must be one of the COMM_ tags. 
00376 // Only message sent via the underlying mechanism with the
00377 // given etag are checked.  When one is found, the user tag and sending
00378 // node are extracted from the sent data.
00379 // If node = COMM_ANY_NODE, checks for messages from any node.
00380 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
00381 Message *CommPM::myreceive(int& node, int& tag, int etag) {
00382   TAU_PROFILE("CommPM::myreceive()", "Message *(int, int, int)", TAU_MESSAGE);
00383   int error;
00384   PM_Message* msgbuf;
00385   int length;
00386   int rec_tag, src_node, rec_size, rec_utag;
00387   void* rec_buff;
00388   Message* newmsg = 0;
00389 
00390   // pickup message
00391   if (error = _pmReceive(pm_network, (caddr_t*)&msgbuf, (size_t*)&length)) {
00392     // no message is received
00393     if (error != ENOBUFS) {
00394       ERRORMSG("CommPM: _pmReceive Error (in myreceive)");
00395     } else {
00396       // no message is available
00397       DEBUGMSG(level2<<"CommPM: No Message Received to Match Request"<<endl);
00398     }
00399   } else {
00400     // a message is received, unpack it
00401     TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00402     src_node = msgbuf->node;
00403     rec_tag = msgbuf->tag;
00404     rec_size = length - sizeof(PM_Message);
00405     rec_buff = makebuffer(rec_size);
00406     //    bcopy((void*) (&(msgbuf->tag) + 1), rec_buff, rec_size);
00407     memcpy(rec_buff, (void*) (&(msgbuf->tag) + 1), rec_size);
00408     _pmPutReceiveBuf(pm_network);
00409     newmsg = unpack_message(src_node, rec_utag, rec_buff);
00410     newmsg->useCommunicate(this, rec_buff);
00411   }
00412 
00413   // Retrun the new Message, or NULL if no message available
00414   node = src_node;
00415   tag = rec_tag;
00416   return newmsg;
00417 }
00418 
00419 
00421 // Synchronize all processors (everybody waits for everybody
00422 // else to get here before returning to calling function).
00423 // Uses MPI barrier for all procs 
00424 void CommPM::mybarrier(void) {
00425   TAU_PROFILE("CommPM::mybarrier()", "void ()", TAU_MESSAGE);
00426 
00427   MPI_Barrier(MPI_COMM_WORLD);
00428 }
00429 
00430 
00432 // clean up after a Message has been used (called by Message).
00433 void CommPM::cleanupMessage(void *d) {
00434   TAU_PROFILE("CommPM::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00435 
00436   // need to free the allocated storage
00437   freebuffer(d);
00438 }
00439 
00440 
00441 /***************************************************************************
00442  * $RCSfile: CommPM.cpp,v $   $Author: adelmann $
00443  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00444  * IPPL_VERSION_ID: $Id: CommPM.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00445  ***************************************************************************/

Generated on Mon Jan 16 13:23:51 2006 for IPPL by  doxygen 1.4.6