src/Message/Communicate.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00027 // Communicate - common member functions for Communicate object.
00028 // On-node traffic
00029 // is handle here and architecture specific routines are called for off-node
00030 // traffic.  This is the base class for all comm-lib-specific subclasses.
00032 
00033 // include files
00034 #include "Message/Communicate.h"
00035 #include "Message/Message.h"
00036 #include "Message/CRC.h"
00037 #include "PETE/IpplExpressions.h"
00038 #include "Profile/Profiler.h"
00039 #include "Utility/IpplInfo.h"
00040 #include "Utility/IpplStats.h"
00041 #include "Utility/RandomNumberGen.h"
00042 #include "Utility/PAssert.h"
00043 #include <stdio.h>
00044 
00045 
00047 // print summary of this class to the given output stream
00048 ostream& operator<<(ostream& o, const Communicate& c)
00049 {
00050   TAU_PROFILE("operator<<()", "ostream (ostream, Communicate)", 
00051               TAU_MESSAGE | TAU_IO);
00052 
00053   o << "Parallel communication method: " << c.name() << "\n";
00054   o << "  Total nodes: " << c.getNodes() << ", Current node: ";
00055   o << c.myNode() << "\n";
00056   o << "  Queued received messages: ";
00057   o << c.getReceived() <<"\n";
00058 
00059   return o;
00060 }
00061 
00062 
00064 // Constructor.
00065 //      arguments: command-line args, and number of processes
00066 // to start (if < 0, start the 'default' number, i.e. the number of
00067 // hosts in a PVM virtual machine, the number of nodes in an O2K, etc)
00068 // Note: The base-class constructor does not need the argument info or
00069 // the number of nodes, it just by default sets the number of nodes=1
00070 // Also note: the derived classes should erase Contexts and Processes, and
00071 // put in the proper values.
00072 Communicate::Communicate(int, char **, int)
00073   : nextMsgNum(1)
00074 {
00075   TAU_PROFILE("Communicate::Communicate()", "void (int, char **, int)",
00076               TAU_MESSAGE);
00077 
00078   // initialize data for Communicate
00079   TotalNodes = 1;
00080   myHost = 0;
00081   ErrorStatus = COMM_NOERROR;
00082   Contexts.push_back(1);
00083   Processes.push_back(Contexts); // using Contexts is just convenient here
00084 }
00085 
00086 
00088 // Destructor.  Nothing to do at present.
00089 Communicate::~Communicate(void)
00090 {
00091   TAU_PROFILE("Communicate::~Communicate()", "void ()", TAU_MESSAGE); 
00092 
00093   // delete the cached messages
00094   SentCache_t::iterator cachei = sentMsgCache.begin();
00095   for ( ; cachei != sentMsgCache.end(); ++cachei)
00096     (*cachei).second.freebuf();
00097 }
00098 
00099 
00101 // Add a new on-node message to the linked list.  Return success.
00102 bool Communicate::add_msg(Message *msg, int node, int tag)
00103 {
00104   TAU_PROFILE("Communicate::add_msg()", "bool (Message *, int, int)",
00105               TAU_MESSAGE);
00106 
00107   recMsgList.push_back(MessageData(node, tag, msg));
00108   return true;
00109 }
00110 
00111 
00113 // Looks for a message in the message queue from the specified node
00114 // and tag.  This understands wildcards for node and tag.
00115 // Returns a pointer to the Message object found, and sets node and
00116 // tag equal to the proper values.  Also, this will remove the item from
00117 // the queue.
00118 Message* Communicate::find_msg(int& node, int& tag)
00119 {
00120   TAU_PROFILE("Communicate::find_msg()", "Message* (int&, int&)",
00121               TAU_MESSAGE);
00122 
00123   // just find the first message that meets the criteria
00124   vector<MessageData>::iterator qi   = recMsgList.begin();
00125   vector<MessageData>::iterator qend = recMsgList.end();
00126   for ( ; qi != qend ; ++qi) {
00127     if ((node == COMM_ANY_NODE || (*qi).node == node) &&
00128         (tag  == COMM_ANY_TAG  || (*qi).tag  == tag)) {
00129       node = (*qi).node;
00130       tag = (*qi).tag;
00131       Message *retval = (*qi).msg;
00132       recMsgList.erase(qi);
00133       //INCIPPLSTAT(incMessageReceived);
00134       //INCIPPLSTAT(incMessageReceivedFromQueue);
00135       return retval;
00136     }
00137   }
00138 
00139   // if we're here, no message was found
00140   return 0;
00141 }
00142 
00143 
00145 // Default version of virtual send function ... here, does nothing.
00146 bool Communicate::mysend(Message *, int, int, int)
00147 {
00148   TAU_PROFILE("Communicate::mysend()", "bool (Message *, int, int, int)",
00149               TAU_MESSAGE);
00150 
00151   // just return false, since we cannot send a message with this function
00152   return false;
00153 }
00154 
00155 
00157 // Default version of virtual receive function ... here, does nothing.
00158 Message* Communicate::myreceive(int&, int&, int)
00159 {
00160   TAU_PROFILE("Communicate::myreceive()", "Message* (int, int, int)",
00161               TAU_MESSAGE);
00162 
00163   // just return NULL, since we cannot find a message with this function
00164   return 0;
00165 }
00166 
00167 
00169 // Default version of virtual barrier function ... here, does nothing.
00170 void Communicate::mybarrier(void)
00171 {
00172   TAU_PROFILE("Communicate::mybarrier()", "void ()", TAU_MESSAGE);
00173 
00174   // just return NULL, since we cannot find a message with this function
00175   return;
00176 }
00177 
00178 
00180 // resent a message buffer that has been previously packed and copied
00181 // into the provided buffer.  Return success.
00182 bool Communicate::resend(void *, int, int, int)
00183 {
00184   TAU_PROFILE("Communicate::resend()", "void (void *, int, int, int)",
00185               TAU_MESSAGE);
00186 
00187   // just return false, since we cannot resend a message with this function
00188   return false;
00189 }
00190 
00191 
00193 // Send data to the given node, with given tag.  If delmsg==true, the
00194 // message will be deleted after it is sent, otherwise it will be left alone.
00195 bool Communicate::send(Message *msg, int node, int tag, bool delmsg)
00196 {
00197   TAU_PROFILE("Communicate::send()", "bool (Message *, int, int, bool)",
00198               TAU_MESSAGE);
00199 
00200   bool retval;
00201 
00202   // process list of resend requests
00203   //process_resend_requests();
00204 
00205   // check for problems ...
00206   if ( node < 0 || node >= getNodes() || tag < 0 ) {
00207     ERRORMSG("Communicate: illegal send node " << node << endl);
00208     ErrorStatus = COMM_ERROR;
00209     return false;
00210   }
00211 
00212   // if the message is addressed to this node, put it in the local receive
00213   // queue immediately
00214   if ( node == myNode() ) {
00215     retval = add_msg(msg, node, tag);
00216     //INCIPPLSTAT(incMessageSent);
00217     //INCIPPLSTAT(incMessageSentToSelf);
00218   } else {
00219     // the message must be sent elsewhere ... call the proper function
00220     retval = mysend(msg, node, tag, COMM_SEND_TAG);
00221 
00222     // if the send was successful, delete the message if requested
00223     if (retval) {
00224       //INCIPPLSTAT(incMessageSent);
00225       //INCIPPLSTAT(incMessageSentToOthers);
00226       if (delmsg)
00227         delete msg;
00228     }
00229   }
00230 
00231   // set error code
00232   ErrorStatus = (retval != 0 ? COMM_NOERROR : COMM_NOSEND);
00233 
00234   // return the success of the operation
00235   return retval;
00236 }
00237 
00238 
00240 // Receive data from another node.  Returns newly created Message object
00241 // with received message, or NULL if no message is available.
00242 // If node == COMM_ANY_NODE, this will receive the next message with the given
00243 // tag from any node.
00244 // If tag == COMM_ANY_TAG, this will receive the next message with
00245 // any tag from the given node.  If both are wildcards, this will receive the
00246 // next message, period.  node and tag are passed by reference; if either
00247 // is a wildcard, and a message is received, they are changed to their actual
00248 // values.
00249 // Messages are searched for in this order (if node == COMM_ANY_NODE) :
00250 //      1. Pending in network
00251 //      2. In receive queue
00252 Message* Communicate::receive(int& node, int& tag)
00253 {
00254   TAU_PROFILE("Communicate::receive()", "Message* (int, int)", TAU_MESSAGE);
00255 
00256   //Inform dbgmsg("Comm::receive", INFORM_ALL_NODES);
00257   //dbgmsg << "Doing receive from node " << node << ", tag " << tag << endl;
00258 
00259   // process list of resend requests
00260   process_resend_requests();
00261 
00262   // do a check for a message from another node
00263   //dbgmsg << "Checking for queued message ..." << endl;
00264   Message *msg = find_msg(node, tag);
00265   //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
00266 
00267   if (msg == 0 && myNode() != node) {
00268     int checknode = node;
00269     int checktag = tag;
00270     //INCIPPLSTAT(incMessageReceiveChecks);
00271     //dbgmsg << "Checking for remote message ..." << endl;
00272     if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0) {
00273       // see if the message matches our criteria for searching
00274       //dbgmsg << "Message found from node " << checknode << " with tag ";
00275       //dbgmsg << checktag << endl;
00276       if ((node != COMM_ANY_NODE && node != checknode) ||
00277           (tag  != COMM_ANY_TAG  && tag  != checktag )) {
00278         // the message does not match; queue it and report no msg found
00279         //dbgmsg << "But it's not what we want." << endl;
00280         add_msg(msg, checknode, checktag);
00281         msg = 0;
00282       } else {
00283         // the message matches; save the node and tag and return the msg
00284         //dbgmsg << "And it is what we want!" << endl;
00285         node = checknode;
00286         tag = checktag;
00287         //INCIPPLSTAT(incMessageReceived);
00288         //INCIPPLSTAT(incMessageReceivedFromNetwork);
00289       }
00290     } else {
00291       //INCIPPLSTAT(incMessageReceiveChecksFailed);
00292     }
00293   }
00294 
00295   // set error code
00296   ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
00297 
00298   // return the message, or NULL if none was found
00299   return msg;
00300 }
00301 
00302 
00304 // A blocking version of receive.
00305 Message *Communicate::receive_block(int& node, int &tag)
00306 {
00307   TAU_TYPE_STRING(taustr, "Message *(int, int)" );
00308   TAU_PROFILE("Communicate::receive_block()", taustr, TAU_MESSAGE);
00309   TAU_PROFILE_TIMER(findtimer, "  Comm::recblock::find", taustr, TAU_MESSAGE);
00310   TAU_PROFILE_TIMER(looptimer, "  Comm::recblock::loop", taustr, TAU_MESSAGE);
00311 
00312   // process list of resend requests
00313   process_resend_requests();
00314 
00315   //Inform dbgmsg("Comm::receive_block", INFORM_ALL_NODES);
00316   //dbgmsg << "Doing blocking receive from node " << node;
00317   //dbgmsg << ", tag " << tag << endl;
00318 
00319   // If we haven't already found a message, check the local messages
00320   //dbgmsg << "Checking for queued message ..." << endl;
00321   TAU_PROFILE_START(findtimer);
00322   Message *msg = find_msg(node, tag);
00323   TAU_PROFILE_STOP(findtimer);
00324   //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
00325 
00326   // keep checking for remote msgs until we get one
00327   TAU_PROFILE_START(looptimer);
00328   if (myNode() != node) {
00329     while (msg == 0) {
00330       // process list of resend requests
00331       process_resend_requests();
00332 
00333       int checknode = node;
00334       int checktag = tag;
00335       //INCIPPLSTAT(incMessageReceiveChecks);
00336       //dbgmsg << "Checking for remote message ..." << endl;
00337       if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0) {
00338         // see if the message matches our criteria for searching
00339         //dbgmsg << "Message found from node " << checknode << " with tag ";
00340         //dbgmsg << checktag << endl;
00341         if ((node != COMM_ANY_NODE && node != checknode) ||
00342             (tag  != COMM_ANY_TAG  && tag  != checktag )) {
00343           // the message does not match; queue it and report no msg found
00344           //dbgmsg << "But it's not what we want." << endl;
00345           add_msg(msg, checknode, checktag);
00346           msg = 0;
00347         } else {
00348           // the message matches; save the node and tag and return the msg
00349           //dbgmsg << "And it is what we want!" << endl;
00350           node = checknode;
00351           tag = checktag;
00352           //INCIPPLSTAT(incMessageReceived);
00353           //INCIPPLSTAT(incMessageReceivedFromNetwork);
00354         }
00355       } else {
00356         //INCIPPLSTAT(incMessageReceiveChecksFailed);
00357         if (Ippl::retransmit())
00358           msg = find_msg(node, tag);
00359       }
00360     }
00361   }
00362   TAU_PROFILE_STOP(looptimer);
00363 
00364   // If we're on just one node, and we did not find a message, this is
00365   // a big problem.
00366   PInsist(!(myNode() == node && msg == 0),
00367           "Local message not found in Communicate::receive_block!!");
00368 
00369   // set error code
00370   ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
00371 
00372   // return the message, or NULL if none was found
00373   return msg;
00374 }
00375 
00376 
00378 // Broadcast the given message to ALL nodes, including this node.
00379 // Return number of nodes sent to.
00380 // Arguments are the Message, and the tag for the message.
00381 int Communicate::broadcast_all(Message *msg, int tag)
00382 {
00383   TAU_PROFILE("Communicate::broadcast_all()", "int (Message *, int)",
00384               TAU_MESSAGE);
00385   int i;                        // loop variable
00386 
00387   // send message to all other nodes
00388   for (i=(getNodes() - 1); i >= 0; i--) {
00389     if (i != myNode()) {
00390       mysend(msg, i, tag, COMM_SEND_TAG);
00391       //INCIPPLSTAT(incMessageSent);
00392       //INCIPPLSTAT(incMessageSentToOthers);
00393     }
00394   }
00395 
00396   // send message to this node; since we do this, don't need to delete msg
00397   add_msg(msg, myNode(), tag);
00398   //INCIPPLSTAT(incMessageSent);
00399   //INCIPPLSTAT(incMessageSentToSelf);
00400 
00401   return getNodes();
00402 }
00403 
00404 
00406 // Broadcast the given message to all OTHER nodes, but not this node.
00407 // Return number of nodes sent to.
00408 // Arguments are the Message, and the tag for the message, and whether
00409 // we should delete the given message object.
00410 int Communicate::broadcast_others(Message *msg, int tag, bool delmsg)
00411 {
00412   TAU_PROFILE("Communicate::broadcast_others()", "int (Message *, int, bool)",
00413               TAU_MESSAGE);
00414   int i;                        // loop variable
00415 
00416   // send message to all other nodes
00417   for (i=(getNodes() - 1); i >= 0; i--) {
00418     if (i != myNode()) {
00419       mysend(msg, i, tag, COMM_SEND_TAG);
00420       //INCIPPLSTAT(incMessageSent);
00421       //INCIPPLSTAT(incMessageSentToOthers);
00422     }
00423   }
00424 
00425   // delete message
00426   if (delmsg)
00427     delete msg;
00428 
00429   return getNodes() - 1;
00430 }
00431 
00432 
00434 // Synchronize all processors (everybody waits for everybody
00435 // else to get here before returning to calling function).
00436 void Communicate::barrier()
00437 {
00438   TAU_PROFILE("Communicate::barrier()", "void ()", TAU_MESSAGE);
00439 
00440   mybarrier();
00441   //INCIPPLSTAT(incBarriers);
00442 }
00443 
00444 
00446 // clean up after a Message has been used (called by Message).  By
00447 // default, does nothing.
00448 void Communicate::cleanupMessage(void *) { }
00449 
00450 
00452 // calculate how big the buffer must be to send the given message
00453 int Communicate::find_msg_length(Message &msg)
00454 {
00455   TAU_PROFILE("Communicate::find_msg_length()", "int (Message)",
00456               TAU_MESSAGE);
00457 
00458   static const int longsize = wordround(sizeof(MsgNum_t));
00459   static const int intsize4 = wordround(4 * sizeof(int));
00460   static const int intsize2 = wordround(2 * sizeof(int));
00461 
00462   // the message contains a long and three integers at the start with the
00463   // msg num, node, tag, and number of items
00464   int buffsize = longsize + intsize4;
00465 
00466   // now include the sizes of the elements themselves.  For each item,
00467   // we also include two integers with size information.
00468   int nitems = msg.size();
00469   for (int i=0; i < nitems; ++i)
00470     buffsize += (intsize2 + wordround(msg.item(i).numBytes()));
00471 
00472   // if checksums are to be performed, add in space for the 32-bit checksum
00473   if (Ippl::useChecksums())
00474     buffsize += sizeof(CRCTYPE);
00475 
00476   return buffsize;
00477 }
00478 
00479 
00481 // put data from the given Message into the given buffer
00482 void Communicate::fill_msg_buffer(void *buffer, Message &msg, int tag,
00483                                   int bufsize, int node)
00484 {
00485   TAU_PROFILE("Communicate::fill_msg_buffer()",
00486               "void (void *, Message, int, int, int)", TAU_MESSAGE);
00487 
00488   void *pos = buffer;             // location in buffer to pack data
00489   int nitems = msg.size();        // Number of items in Message
00490   int mdata[4];                   // Array to store msg header info
00491   MsgNum_t mnum = (nextMsgNum++); // Message ID
00492 
00493   //Inform dbgmsg("***Communicate::fill_msg_buffer", INFORM_ALL_NODES);
00494   //dbgmsg << "Preparing to send out message " << mnum;
00495   //dbgmsg << " with tag " << tag << " of size " << bufsize << endl;
00496 
00497   // we must make sure to zero out the buffer if we're using checksums,
00498   // so that random data values do not occur in the spaces where word
00499   // alignment padding is used
00500   if (Ippl::useChecksums())
00501     memset(pos, 0, bufsize);
00502 
00503   // put message ID info into the buffer
00504   pack(&mnum, pos, sizeof(MsgNum_t));
00505 
00506   // put message header info into the buffer
00507   mdata[0] = tag;
00508   mdata[1] = myNode();
00509   mdata[2] = nitems;
00510   mdata[3] = bufsize;
00511   pack(mdata, pos, 4*sizeof(int));
00512 
00513   // finally pack in the data
00514   for (int i=0; i < nitems; ++i) {
00515     Message::MsgItem &msgitem = msg.item(i);
00516     mdata[0] = msgitem.numElems();
00517     mdata[1] = msgitem.numBytes();
00518     pack(mdata, pos, 2*sizeof(int));
00519     if (mdata[1] > 0)
00520       pack(msgitem.data(), pos, mdata[1]);
00521   }
00522 
00523   // if checksums are on, find the checksum and append it to the buffer
00524   if (Ippl::useChecksums()) {
00525     // calculate the crc
00526     int crcsize = bufsize - sizeof(CRCTYPE);
00527     CRCTYPE crcval = crc(buffer, crcsize);
00528 
00529     // append it to the end of the buffer
00530     *(static_cast<CRCTYPE *>(pos)) = crcval;
00531 
00532     // if we're trying to retransmit, cache the message
00533     if (Ippl::retransmit()) {
00534       if (tag != IPPL_RETRANSMIT_TAG && tag != IPPL_MSG_OK_TAG) {
00535         //dbgmsg << "Adding message " << mnum << " of size " << bufsize;
00536         //dbgmsg << " with tag " << tag << " to sent cache." << endl;
00537         add_to_send_cache(buffer, mnum, bufsize, node);
00538       } else {
00539         //dbgmsg << "NOT adding msg with tag " << tag << " to cache" << endl;
00540       }
00541     }
00542   }
00543 
00544   ADDIPPLSTAT(incMessageBytesSent,bufsize);
00545 }
00546 
00547 
00549 // get data out of a buffer and create a Message
00550 Message* Communicate::unpack_message(int &node, int &tag, void *buffer)
00551 {
00552   TAU_PROFILE("Communicate::unpack_message()",
00553               "Message* (int, int, long, void*)", TAU_MESSAGE);
00554 
00555   Message *newmsg = 0;
00556 
00557   // pos will always point to the next location in the buffer to get data
00558   void *pos = buffer;
00559 
00560   // get the message ID number
00561   MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
00562   addwordround(pos, sizeof(MsgNum_t));
00563 
00564   // get the tag, sender, and number of messages
00565   int *mdata = static_cast<int *>(pos);
00566   tag = mdata[0];
00567   node = mdata[1];
00568   int nitems = mdata[2];
00569   int bufsize = mdata[3];
00570   addwordround(pos, 4*sizeof(int));
00571 
00572   //WARNMSG("Received message " << mnum << " from node " << node);
00573   //WARNMSG(" with tag " << tag << " of size " << bufsize << endl);
00574 
00575   // check for special tags, to abort, retransmit, or just receive
00576   if (tag == IPPL_ABORT_TAG) {
00577     ERRORMSG("Stopping due to abort request sent from node " << node << endl);
00578     ::abort();
00579 
00580   } else if (tag == IPPL_EXIT_TAG) {
00581     ERRORMSG("Exiting due to exit request sent from node " << node << endl);
00582     ::exit(1);
00583 
00584   } else if (tag == IPPL_RETRANSMIT_TAG) {
00585     // get the retransmit message number and tag out of the current buffer
00586     unpack_retransmission_request(nitems, pos);
00587 
00588   } else if (tag == IPPL_MSG_OK_TAG) {
00589     // clear out the messages that this message lists are OK to be deleted
00590     clear_ok_messages(nitems, pos);
00591 
00592   } else {
00593     // this is just a regular message
00594 
00595     // do checksum comparison, if checksums are on
00596     if (Ippl::useChecksums()) {
00597       // calculate the crc
00598       int crcsize = bufsize - sizeof(CRCTYPE);
00599       CRCTYPE crcval = crc(buffer, crcsize);
00600 
00601       // as a test, randomly change crcval
00602       //if (IpplRandom() < 0.1)
00603       //  crcval += 1;
00604 
00605       // compare this crc to the original one
00606       void *origloc = static_cast<void *>(static_cast<char *>(buffer)+crcsize);
00607       CRCTYPE origcrc = *(static_cast<CRCTYPE *>(origloc));
00608       if (crcval != origcrc) {
00609         ERRORMSG("Failed CRC check (" << crcval << " != " << origcrc);
00610         ERRORMSG(") on node " << Ippl::myNode());
00611         ERRORMSG(" for message " << mnum << " of size " << bufsize);
00612         ERRORMSG(" bytes sent from node ");
00613         ERRORMSG(node << " with tag " << tag << endl);
00614         if (Ippl::retransmit()) {
00615           // send off a request to have message 'mnum' resent to us by 'node'
00616           requestList.push_back(pair<int,MsgNum_t>(node, mnum));
00617         } else {
00618           // since we're not trying to retransmit, we just quit.
00619           PInsist(crcval == origcrc, "Exiting due to CRC check failure.");
00620         }
00621 
00622         // and then return 0 so that the caller knows there was a problem
00623         return 0;
00624       }
00625     }
00626 
00627     // if we're here, the checksums (if enabled) were OK, so receive the
00628     // message
00629 
00630     // create data structure for this message
00631     newmsg = new Message(nitems);
00632 
00633     // get all the items and add to the message
00634     for (int j = 0; j < nitems; j++) {
00635       int *hdr = static_cast<int *>(pos);
00636       int elements = hdr[0];
00637       int bytesize = hdr[1];
00638       addwordround(pos, 2*sizeof(int));
00639 
00640       // for each item, find the pointer to the actual data and give
00641       // that pointer to the Message object.  The Message object then
00642       // does not delete the data until the very end, when the Message
00643       // is deleted.
00644       if (bytesize > 0 && elements > 0) {
00645         newmsg->setCopy(false);
00646         newmsg->setDelete(false);
00647         newmsg->putmsg(pos, bytesize/elements, elements);
00648         addwordround(pos, bytesize);
00649       }
00650     }
00651 
00652     // indicate we've received a normal message
00653     ADDIPPLSTAT(incMessageBytesReceived,bufsize);
00654 
00655     // tell the sender that we received this message OK
00656     if (Ippl::retransmit())
00657       informOKList.push_back(pair<int,MsgNum_t>(node, mnum));
00658   }
00659 
00660   // return the new message, or zero to indicate the buffer contained
00661   // something else than an actual message
00662   return newmsg;
00663 }
00664 
00665 
00667 // put the given message buffer in the sent-message cache, as a new
00668 // CommSendInfo object storing the buffer and other information.
00669 void Communicate::add_to_send_cache(void *msgbuf, MsgNum_t mnum, int msgsize,
00670                                     int node)
00671 {
00672   TAU_PROFILE("Communicate::add_to_send_cache",
00673               "void (void*, MsgNum_t, int)", TAU_MESSAGE);
00674 
00675   // make sure we do not already have this message
00676   SentCache_t::iterator senti = sentMsgCache.find(mnum);
00677   if (senti != sentMsgCache.end()) {
00678     ERRORMSG("ERROR: Trying to cache an already-cached message with num = ");
00679     ERRORMSG(mnum << endl);
00680     return;
00681   }
00682 
00683   //Inform dbgmsg("***Communicate::add_to_send_cache", INFORM_ALL_NODES);
00684   //dbgmsg << "Adding message " << mnum << " to cache with size = " << msgsize;
00685   //dbgmsg << endl;
00686 
00687   // make a copy of the message
00688   char *copybuf = new char[msgsize];
00689   memcpy(copybuf, msgbuf, msgsize);
00690 
00691   // add the message to the cache list
00692   CommSendInfo csi(msgsize, copybuf, node);
00693   sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
00694 
00695   //dbgmsg<<"Sent message cached; now " << sentMsgCache.size() << " buffers ";
00696   //dbgmsg << "in the cache." << endl;
00697 }
00698 
00699 
00701 // send off a request to have this message retransmitted to us
00702 void Communicate::request_retransmission(int node, MsgNum_t mnum)
00703 {
00704   Inform dbgmsg("***Communicate::request_retransmission", INFORM_ALL_NODES);
00705   dbgmsg << "Requesting retransmission of message " << mnum << " from node ";
00706   dbgmsg << node << endl;
00707 
00708   // create a regular message, but with the
00709   // special retransmit tag and the message number as the one item
00710   // in the Message
00711   Message msg(1);
00712   msg.put(mnum);
00713   send(&msg, node, IPPL_RETRANSMIT_TAG, false); // does not delete message
00714 }
00715 
00716 
00718 // get the resend information from a buffer sent in a message requesting
00719 // retransmission
00720 void Communicate::unpack_retransmission_request(int nitems, void *pos)
00721 {
00722   Inform dbgmsg("***Communicate::unpack_retrans_req", INFORM_ALL_NODES);
00723   //dbgmsg << "Unpacking retransmission request ..." << endl;
00724 
00725   // retransmission messages have the following information as separate
00726   // items:
00727   //   message number to retransmit (type == MsgNum_t)
00728   // so, nitems should be one, and the bytesize should match
00729   PInsist(nitems == 1, "Wrong number of items in retransmit request.");
00730 
00731   // get the retransmit message number item header info
00732   int *hdr = static_cast<int *>(pos);
00733   PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
00734           "Wrong message info for retransmit message number.");
00735   addwordround(pos, 2*sizeof(int));
00736 
00737   // get the actual retransmit message number
00738   MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
00739   dbgmsg << "Received request to resend message " << mnum << endl; 
00740   resendList.push_back(mnum);
00741 }
00742 
00743 
00745 // for message mnum, resend the data
00746 void Communicate::perform_resend(MsgNum_t mnum)
00747 {
00748   // get the message info from our cache.
00749   SentCache_t::iterator senti = sentMsgCache.find(mnum);
00750   PInsist(senti != sentMsgCache.end(),
00751           "Could not find message in local sent cache to retransmit.");
00752 
00753   // get the node and size
00754   int size = (*senti).second.size();
00755   int node = (*senti).second.node();
00756 
00757   // resend the data
00758   ERRORMSG("WARNING: Resending message number " << mnum);
00759   ERRORMSG(" of size " << size << " from node ");
00760   ERRORMSG(myNode() << " to node " << node << " ..." << endl);
00761   resend((*senti).second.buf(), size, node, COMM_SEND_TAG);
00762 }
00763 
00764  
00766 // tell the sender that we received this message OK
00767 void Communicate::send_ok_message(int node, MsgNum_t mnum)
00768 {
00769   Inform dbgmsg("***Communicate::send_ok_message", INFORM_ALL_NODES);
00770   dbgmsg << "Informing node " << node << " that message " << mnum;
00771   dbgmsg << " was received ok." << endl;
00772 
00773   Message msg(1);
00774   msg.put(mnum);                // the list of message numbers, one at a time
00775 
00776   send(&msg, node, IPPL_MSG_OK_TAG, false); // does not delete message
00777 }
00778 
00779 
00781 // unpack message with a list of OK message numbers, and delete them
00782 // from our cache
00783 void Communicate::clear_ok_messages(int nitems, void *pos)
00784 {
00785   Inform dbgmsg("***Communicate::clear_ok_messages", INFORM_ALL_NODES);
00786   //dbgmsg << "Unpacking messages-ok information for " << nitems;
00787   //dbgmsg << " messages ..." << endl;
00788 
00789   // message-ok messages have the following information as separate
00790   // items:
00791   //   the number of OK messages (type == int)
00792   //   the first OK message number (type == MsgNum_t)
00793   //   the second OK message number (type == MsgNum_t)
00794   //   etc
00795   PInsist(nitems >= 1, "Wrong number of items in retransmit request.");
00796 
00797   // loop through the list of items, get the message number from each,
00798   // and remove that message from our queue
00799   for (int i=0; i < nitems; ++i) {
00800     // get the message-ok header
00801     int *hdr = static_cast<int *>(pos);
00802     PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
00803             "Wrong message info for message-ok number.");
00804     addwordround(pos, 2*sizeof(int));
00805 
00806     // get the message-ok number
00807     MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
00808     addwordround(pos, sizeof(MsgNum_t));
00809 
00810     // add this number to our list of messages to say are OK
00811     dbgmsg << "Will clear message " << mnum << " as OK." << endl;
00812     sentOKList.push_back(mnum);
00813   }
00814 }
00815 
00816 
00818 // unpack message with a list of OK message numbers, and delete them
00819 // from our cache
00820 void Communicate::remove_single_ok_message(MsgNum_t mnum)
00821 {
00822   Inform dbgmsg("***Communicate::remove_single_ok_message", INFORM_ALL_NODES);
00823 
00824   // check if we have that message
00825   SentCache_t::iterator senti = sentMsgCache.find(mnum);
00826   if (senti == sentMsgCache.end()) {
00827     // we do not have it; print an error message
00828     ERRORMSG("ERROR: Received 'message ok' for message " << mnum);
00829     ERRORMSG(", but this node does not have that message in cache." << endl);
00830 
00831   } else {
00832     // we have it, so remove it after freeing the buffer
00833     (*senti).second.freebuf();
00834     sentMsgCache.erase(senti);
00835     dbgmsg << "Removed message " << mnum << " from send cache; now ";
00836     dbgmsg << sentMsgCache.size() << " messages in list." << endl;
00837   }
00838 }
00839 
00840 
00842 // process list of resend requests
00843 void Communicate::process_resend_requests()
00844 {
00845   if (resendList.size() > 0) {
00846     Inform dbgmsg("***Communicate::process_resend_reqs", INFORM_ALL_NODES);
00847     dbgmsg << "Clearing " << sentOKList.size() << " and resending ";
00848     dbgmsg << resendList.size() << " messages ..." << endl;
00849   }
00850 
00851   // clear out OK messages
00852   while (sentOKList.size() > 0) {
00853     MsgNum_t mnum = *(sentOKList.begin());
00854     sentOKList.erase(sentOKList.begin());
00855     remove_single_ok_message(mnum);
00856   }
00857 
00858   // resend a message, if necessary
00859   while (resendList.size() > 0) {
00860     MsgNum_t mnum = *(resendList.begin());
00861     resendList.erase(resendList.begin());
00862     perform_resend(mnum);
00863   }
00864 
00865   // inform other nodes that we've received their messages ok
00866   while (informOKList.size() > 0) {
00867     int node = (*(informOKList.begin())).first;
00868     MsgNum_t mnum = (*(informOKList.begin())).second;
00869     informOKList.erase(informOKList.begin());
00870     send_ok_message(node, mnum);
00871   }
00872     
00873   // request resends from other nodes
00874   while (requestList.size() > 0) {
00875     int node = (*(requestList.begin())).first;
00876     MsgNum_t mnum = (*(requestList.begin())).second;
00877     requestList.erase(requestList.begin());
00878     request_retransmission(node, mnum);
00879   }
00880 }
00881 
00882 
00883 /***************************************************************************
00884  * $RCSfile: Communicate.cpp,v $   $Author: adelmann $
00885  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00886  * IPPL_VERSION_ID: $Id: Communicate.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00887  ***************************************************************************/

Generated on Mon Jan 16 13:23:51 2006 for IPPL by  doxygen 1.4.6