src/Message/CommACLMPL.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 // include files
00027 #include "Message/CommACLMPL.h"
00028 #include "Message/Message.h"
00029 #include "Profile/Profiler.h"
00030 #include "Utility/IpplInfo.h"
00031 #include "Utility/PAssert.h"
00032 
00033 #include <string.h>
00034 #include <stdlib.h>
00035 #include <unistd.h>
00036 
00037 // if an error occurs during myreceive more times than this, CommACLMPL
00038 // will just exit.  Make it negative to totally disable checking for a
00039 // maximum number of errors
00040 #define MAX_ACLMPL_ERRS 500
00041 
00043 // constructor.  arguments: command-line args, and number of processes
00044 // to start (if < 0, start the 'default' number, i.e. the number of
00045 // hosts in a ACLMPL virtual machine, the number of nodes in an O2K, etc)
00046 // Note: The base-class constructor does not need the argument info or
00047 // the number of nodes, it just by default sets the number of nodes=1
00048 CommACLMPL::CommACLMPL(int argc, char** argv, int procs)
00049         : Communicate(argc, argv, procs) {
00050   TAU_PROFILE("CommACLMPL::CommACLMPL()", "void (int, char **, int)",
00051               TAU_MESSAGE);
00052   int i, rep_host, reported, ierror=0;
00053   // ACLMPL specific stuff...
00054   if (procs < 0) procs = 0;
00055   ACLMPL_initialize(procs);       
00056   myHost = (int) ACLMPL_self();
00057   TotalNodes = (int) ACLMPL_npes();
00058 
00059   // make sure we do not have too many processes running
00060   if (procs > 0 && procs < TotalNodes) {
00061     // if this is a process that is beyond what we had requested, just exit
00062     if (myHost >= procs)
00063       Ippl::abort();
00064     TotalNodes = procs;
00065   }
00066 
00067   int msize = wordround(sizeof(int));
00068   void *sendbuf = makebuffer(msize);
00069   void *recbuff = makebuffer(msize);
00070   void *pos;
00071 
00072   if (myHost == 0) { // master process
00073     // send a message to each child node
00074     pos = sendbuf;
00075     pack(&myHost, pos, sizeof(int));
00076     for (i=1; i<TotalNodes; i++) 
00077       ACLMPL_send(i, sendbuf, msize);
00078 
00079     // wait for child nodes to report ready
00080     INFOMSG("CommACLMPL: Parent process waiting for children ..." << endl);
00081     int *child_ready = new int[TotalNodes];
00082     for (i=0; i<TotalNodes; i++) child_ready[i] = 0;
00083     reported = 1; // parent is already ready
00084 
00085     while (reported<TotalNodes) {
00086       ierror = ACLMPL_recv(reported,recbuff,msize);
00087       if (ierror>0) {
00088         pos = recbuff;
00089         unpack(pos,&rep_host,sizeof(int));
00090         if (rep_host>0 && rep_host<TotalNodes && !(child_ready[rep_host])) {
00091           child_ready[rep_host] = 1;
00092           reported++;
00093           INFOMSG("CommACLMPL: Child " << rep_host << " ready." << endl);
00094         }
00095         else {
00096           ERRORMSG("CommACLMPL: Error with child reporting to parent.  ");
00097           ERRORMSG("rep_host = " << rep_host);
00098         }
00099         ierror = 0;
00100       }
00101       else {
00102         ERRORMSG("CommACLMPL: Message from child has size " << ierror << endl);
00103       }
00104     }
00105 
00106     delete [] child_ready;
00107     INFOMSG("CommACLMPL: Initialization complete." << endl);
00108   } else {
00109     // this is a child process; get data from parent
00110     int checknode;
00111     ierror = ACLMPL_recv(0, recbuff, msize);
00112     if (ierror>0) {
00113       pos = recbuff;
00114       unpack(pos,&checknode,sizeof(int));
00115       if (checknode != 0)
00116         WARNMSG("CommACLMPL: Child received bad message in startup." << endl);
00117     } else {
00118       WARNMSG("CommACLMPL: Child received message of size " << ierror << endl);
00119     }
00120 
00121     // send back an acknowledgement
00122     pos = sendbuf;
00123     pack(&myHost,pos,sizeof(int));
00124     ACLMPL_send(0,sendbuf,msize);
00125   }
00126 
00127   freebuffer(sendbuf);
00128   freebuffer(recbuff);
00129 
00130   // set up the contexts and processes arrays properly
00131   if (TotalNodes > 1) {
00132     vector<int> proccount;
00133     proccount.push_back(1);
00134     for (int i = 1; i < TotalNodes; i++) {
00135       Contexts.push_back(1);
00136       Processes.push_back(proccount);
00137     }
00138   }
00139 
00140 }
00141 
00142 
00144 // class destructor
00145 CommACLMPL::~CommACLMPL(void) {
00146   TAU_PROFILE("CommACLMPL::~CommACLMPL()", "void ()", TAU_MESSAGE);
00147 
00148   // on all nodes, when running in parallel, get any extra messages not
00149   // yet received
00150   if (TotalNodes > 1) {
00151     int trial, node, tag;
00152     Message *msg;
00153     for (trial = 0; trial < 50000; ++trial) {
00154       do {
00155         node = COMM_ANY_NODE;
00156         tag = COMM_ANY_TAG;
00157         msg = myreceive(node, tag, COMM_SEND_TAG);
00158         if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG) {
00159           WARNMSG("CommACLMPL: Found extra message from node " << node);
00160           WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00161         }
00162       } while (msg != 0);
00163     }
00164   }
00165 }
00166 
00167 
00169 // take the data from a Message object and pack it into the current send buf.
00170 // each message is packed in this order:
00171 //      tag, sending node, number of items             (3-int array)
00172 //              type of item 1  (int)
00173 //              size of item 1, in number of elements   (int)
00174 //              item 1 data     (various)
00175 //              ...
00176 //              type of item N  (int)
00177 //              size of item N, in number of elements   (int)
00178 //              item N data     (various)
00179 AsyncMsgInfo *
00180 CommACLMPL::pack_message(Message *msg, int tag, int &buffsize, int node) {
00181   TAU_PROFILE("CommACLMPL::pack_message()", "(Message *, int, int, int)",
00182               TAU_MESSAGE);
00183 
00184   // calculate size of buffer
00185   buffsize = find_msg_length(*msg);
00186 
00187   // allocate storage for buffer
00188   AsyncMsgInfo *outbuffer = ACLMPL_asend_alloc(buffsize);
00189   void *pos = ACLMPL_agetbuffer(outbuffer);
00190 
00191   // pack message data and return the necessary pointer
00192   fill_msg_buffer(pos, *msg, tag, buffsize, node);
00193   return outbuffer;
00194 }
00195 
00196 
00198 // send a message ... arguments are the Message itself, the
00199 // destination node, the 'user' tag, and the 'encoding' tag.
00200 // Messages should be sent via the underlying mechanism by using the
00201 // encoding tag (one of the COMM_ tags),
00202 // and should embed the information about what the user
00203 // tag is in the data sent between nodes.  Return success.
00204 bool CommACLMPL::mysend(Message* msg, int node, int tag, int etag) {
00205   TAU_TYPE_STRING(taustr, "bool (Message *, int, int, int)");
00206   TAU_PROFILE("CommACLMPL::mysend()", taustr, TAU_MESSAGE);
00207   TAU_PROFILE_TIMER(packtimer, "  ACLMPL::mysend::pack", taustr, TAU_MESSAGE);
00208   TAU_PROFILE_TIMER(sendtimer, "  ACLMPL::mysend::send", taustr, TAU_MESSAGE);
00209 
00210   // the size of the buffer we are creating
00211   int size = 0;
00212 
00213   // pack and send the message
00214   TAU_PROFILE_START(packtimer);
00215   AsyncMsgInfo *outbuffer = pack_message(msg, tag, size, node);
00216   TAU_PROFILE_STOP(packtimer);
00217 
00218   TAU_PROFILE_START(sendtimer);
00219   int sentsize = ACLMPL_alink(node, etag, outbuffer);
00220   TAU_PROFILE_STOP(sendtimer);
00221 
00222   // check for an error, and return
00223   if (sentsize == 0)
00224     ERRORMSG("CommACLMPL: Failure during asynchronous send!" << endl);
00225   return (sentsize != 0);
00226 }
00227 
00228 
00230 // receive a message from the given node and user tag.  Return a NEW
00231 // Message object if a message arrives, or NULL if no message available.
00232 // node will be set to the node from which the message was sent.
00233 // tag will be set to the 'user tag' for that message.
00234 // etag is the 'encoding' tag, and must be one of the COMM_ tags. 
00235 // Only message sent via the underlying mechanism with the
00236 // given etag are checked.  When one is found, the user tag and sending
00237 // node are extracted from the sent data.
00238 // If node = COMM_ANY_NODE, checks for messages from any node.
00239 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
00240 Message* CommACLMPL::myreceive(int& node, int& tag, int etag) {
00241   TAU_TYPE_STRING(taustr, "Message *(int, int, int)" );
00242   TAU_PROFILE("CommACLMPL::myreceive()", taustr, TAU_MESSAGE);
00243   TAU_PROFILE_TIMER(findtimer, "  ACLMPL::myrec::find", taustr, TAU_MESSAGE);
00244   TAU_PROFILE_TIMER(infotimer, "  ACLMPL::myrec::getinfo",taustr, TAU_MESSAGE);
00245   TAU_PROFILE_TIMER(unpktimer, "  ACLMPL::myrec::unpack", taustr, TAU_MESSAGE);
00246 
00247   int checknode = (node < 0 || node >= TotalNodes ? ACLMPL_ANY_PE : node);
00248   int checktag = etag;
00249 
00250   // get info about message
00251   Message* newmsg = NULL;
00252   TAU_PROFILE_START(findtimer);
00253   AsyncMsgInfo *msg;
00254   msg = ACLMPL_afind(checknode, checktag);
00255   TAU_PROFILE_STOP(findtimer);
00256 
00257   if (msg != 0) {
00258     checknode = ACLMPL_agetsource(msg);
00259     checktag = ACLMPL_agettag(msg);
00260     //    int size = ACLMPL_agetsize(msg);
00261 
00262     TAU_PROFILE_START(unpktimer);
00263     // make a copy of the buffer
00264     //void *outbuff = makebuffer(size);
00265     //memcpy(outbuff, ACLMPL_agetbuffer(msg), size);
00266     void *outbuff = ACLMPL_agetbuffer(msg);
00267 
00268     // unpack the data from the buffer into a Message
00269     newmsg = unpack_message(node, tag, outbuff);
00270 
00271     if (newmsg == 0) {
00272       // there was a problem with the message body, so we just throw this
00273       // one away
00274       cleanupMessage(static_cast<void *>(msg));
00275     } else {
00276       // tell the Message object that we should be notified when the buffer
00277       // holding this data should be deallocated
00278       //newmsg->useCommunicate(this, outbuff);
00279       //ACLMPL_afree(msg);
00280       newmsg->useCommunicate(this, static_cast<void *>(msg));
00281     }
00282 
00283     TAU_PROFILE_STOP(unpktimer);
00284   }
00285   else {
00286     // no message available
00287     DEBUGMSG(level2 << "CommACLMPL: No Message Found to Match Request!" << endl);
00288   }
00289 
00290   // return the new Message, or NULL if no message available
00291   return newmsg;
00292 }
00293 
00294 
00296 // Synchronize all processors (everybody waits for everybody
00297 // else to get here before returning to calling function).
00298 // Uses ACLMPL barrier for all procs 
00299 void CommACLMPL::mybarrier(void) {
00300   TAU_PROFILE("CommACLMPL::mybarrier()", "void ()", TAU_MESSAGE);
00301 
00302   ACLMPL_barrier();
00303 }
00304 
00305 
00307 // resent a message buffer that has been previously packed and copied
00308 // into the provided buffer.  Return success.
00309 bool CommACLMPL::resend(void *buf, int buffsize, int node, int etag) {
00310   TAU_PROFILE("CommACLMPL::resend()", "void (void *, int, int, int)",
00311               TAU_MESSAGE);
00312 
00313   //Inform dbgmsg("CommACLMPL::resend", INFORM_ALL_NODES);
00314   //dbgmsg << "About to resend buffer of size " << buffsize << " to node ";
00315   //dbgmsg << node << " with etag = " << etag << endl;
00316 
00317   // this will only work if we're sending to another node
00318   PInsist(node != myNode(), "Can only retransmit to other nodes");
00319 
00320   // allocate storage for buffer
00321   AsyncMsgInfo *outbuffer = ACLMPL_asend_alloc(buffsize);
00322   void *pos = ACLMPL_agetbuffer(outbuffer);
00323 
00324   // copy the buffer into storage that can be used to send
00325   memcpy(pos, buf, buffsize);
00326 
00327   // now send the buffer
00328   int sentsize = ACLMPL_alink(node, etag, outbuffer);
00329 
00330   //dbgmsg << "Message resent; sentsize = " << sentsize << endl;
00331 
00332   // check for an error, and return
00333   if (sentsize == 0)
00334     ERRORMSG("CommACLMPL: Failure during asynchronous resend!" << endl);
00335   return (sentsize != 0);
00336 }
00337 
00338 
00340 // clean up after a Message has been used (called by Message).
00341 void CommACLMPL::cleanupMessage(void *d) {
00342   TAU_PROFILE("CommACLMPL::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00343 
00344   // must free up the buffers from ACLMPL
00345   ACLMPL_afree(static_cast<AsyncMsgInfo *>(d));
00346 }
00347 
00348 
00349 /***************************************************************************
00350  * $RCSfile: CommACLMPL.cpp,v $   $Author: adelmann $
00351  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00352  * IPPL_VERSION_ID: $Id: CommACLMPL.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00353  ***************************************************************************/

Generated on Mon Jan 16 13:23:51 2006 for IPPL by  doxygen 1.4.6