src/Message/GlobalComm.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 // include files
00027 #include "Message/GlobalComm.h"
00028 #include "Message/Communicate.h"
00029 #include "Message/Message.h"
00030 #include "Message/Tags.h"
00031 #include "Utility/IpplInfo.h"
00032 #include "Utility/IpplStats.h"
00033 #include "Profile/Profiler.h"
00034 
00035 #ifdef IPPL_STDSTL
00036 // Standard STL names this file "algorithm"
00037 #include <algorithm>
00038 using namespace std;
00039 #else
00040 #include <algo.h>
00041 #endif // IPPL_STDSTL
00042 
00043 
00045 // Reduce equally-sized arrays across the machine, by sending to node
00046 // 0 and broadcasting back the result.  The arguments are two begin,end
00047 // iterators for the source of the data, an iterator pointing to
00048 // where the summed data should go, and an operation to perform in
00049 // the reduction ... this last should be from PETE, e.g., OpAdd, etc.
00050 // template classes found in either ../Expression/Applicative.h or
00051 // ../Expression/TypeComputations.h. The are simple classes  such as
00052 // OpAnd(), OpMax(), OpAdd(), OpMultipply(), etc....
00053 // Return success.
00054 // The final argument indicates whether the LOCAL NODE should have it's
00055 // values included in the reduction (by default, this is true).  If this
00056 // pointer to the boolean array is null, all the values will be included.
00057 // NOTE: The input iterators must iterate over simple data objects,
00058 // which do not require their own special getMessage/putMessage.  If you
00059 // need to reduce a complex quantity, use the scalar version of reduce.
00060 template <class InputIterator, class OutputIterator, class ReduceOp>
00061 bool reduce(Communicate& comm, InputIterator s1, InputIterator s2,
00062             OutputIterator t1, const ReduceOp& op, bool *IncludeVal) {
00063 
00064   TAU_TYPE_STRING(profileString, "bool (Communicate, " + CT(s1) + ", " 
00065     + CT(s2) + " )");
00066 
00067   TAU_PROFILE("reduce()", profileString, TAU_MESSAGE);
00068 
00069   // Inform dbgmsg("reduce-vector", INFORM_ALL_NODES);
00070 
00071   // determine destination node and tags
00072   int parent = 0;
00073   int sendtag = comm.next_tag(COMM_REDUCE_SEND_TAG, COMM_REDUCE_CYCLE);
00074   int rectag  = comm.next_tag(COMM_REDUCE_RECV_TAG, COMM_REDUCE_CYCLE);
00075 
00076   // determine how many elements we have to reduce
00077   unsigned int elements = 0;
00078   for (InputIterator tmps1 = s1; tmps1 != s2; ++tmps1, ++elements);
00079   if (elements == 0) {
00080     ERRORMSG("reduce: nothing to reduce." << endl);
00081   }
00082 
00083   // create flags, if they are not provided
00084   bool *useFlags = IncludeVal;
00085   if (useFlags == 0 && elements > 0) {
00086     useFlags = new bool[elements];
00087     for (unsigned int u=0; u < elements; useFlags[u++] = true);
00088   }
00089 
00090   if ( comm.myNode() != parent ) {
00091     // send the source data to node 0 if we are not node 0
00092     Message *msg = new Message;
00093     // dbgmsg << "sending message with " << elements << " elements to node ";
00094     // dbgmsg << parent << " with tag " << sendtag << endl;
00095     ::putMessage(*msg, elements);
00096     if (elements > 0) {
00097       ::putMessage(*msg, s1, s2);
00098       ::putMessage(*msg, useFlags, useFlags + elements);
00099     }
00100     if ( ! comm.send(msg, parent, sendtag) ) {
00101       Ippl::abort("reduce: cannot send reduce buffers.");
00102     }
00103 
00104     // then we get the results back
00105     msg = comm.receive_block(parent, rectag);
00106     // dbgmsg << "received message with size = " << msg->size();
00107     // dbgmsg << " from node " << parent << " with tag " << rectag << endl;
00108     if ( ! msg || msg->size() < 1 )
00109       Ippl::abort("reduce: cannot receive reduce results.");
00110     getMessage(*msg, *t1);
00111     delete msg;
00112 
00113   } else {
00114     // first copy the source into the target; this is like receiving
00115     // from ourselves
00116     InputIterator  tmp1  = s1;
00117     OutputIterator t2    = t1;
00118     bool*          copyf = useFlags;
00119     for ( ; tmp1 != s2; ++tmp1, ++t2, ++copyf)
00120       if (*copyf)
00121         *t2 = *tmp1;
00122 
00123     // the parent receives all the messages and then broadcasts the
00124     // reduced result
00125     int notReceived = comm.getNodes() - 1;
00126     while (notReceived > 0) {
00127       // receive message
00128       int fromnode = COMM_ANY_NODE;
00129       Message *recmsg = comm.receive_block(fromnode, sendtag);
00130       // dbgmsg << "received message with size = " << recmsg->size();
00131       // dbgmsg << " from node " << fromnode << " with tag "<<sendtag<<endl;
00132       if ( ! recmsg || recmsg->size() < 1 )
00133         Ippl::abort("reduce: cannot receive reduce buffers.");
00134 
00135       // get data from message
00136       int recelems;
00137       ::getMessage(*recmsg, recelems);
00138       if (recelems != elements)
00139         Ippl::abort("reduce: mismatched element count in vector reduction.");
00140       if (elements > 0) {
00141         InputIterator reci = (InputIterator)(recmsg->item(0).data());
00142         bool *recflag = (bool *)(recmsg->item(1).data());
00143 
00144         // the target buffer must have size >= size of the source, so
00145         // we can iterate over the source iterator
00146         unsigned int u;
00147         for (u=0, t2=t1; u < elements; ++t2, ++reci, ++u) {
00148           if (recflag[u]) {
00149             if (useFlags[u]) {
00150               PETE_apply(op, *t2, *reci);
00151             } else {
00152               *t2 = *reci;
00153               useFlags[u] = true;
00154             }
00155           }
00156         }
00157       }
00158 
00159       // finished with this node's data
00160       delete recmsg;
00161       notReceived--;
00162     }
00163 
00164     // Finally, broadcast the results out.  t2 should now point to the
00165     // end of the target buffer.
00166     if (comm.getNodes() > 1) {
00167       Message *sendmsg = new Message();
00168       putMessage(*sendmsg, t1, t2);
00169       // dbgmsg << "sending message with size " << sendmsg->size();
00170       // dbgmsg << " to all nodes with tag " << rectag << endl;
00171       if (comm.broadcast_others(sendmsg, rectag) != (comm.getNodes() - 1))
00172         Ippl::abort("reduce: cannot send reduce results.");
00173     }
00174   }
00175 
00176   // we're done
00177   if (useFlags != 0 && useFlags != IncludeVal)
00178     delete [] useFlags;
00179 
00180   //INCIPPLSTAT(incReductions);
00181   return true;
00182 }
00183 
00184 
00186 // same as above, but this uses the default Communicate object
00187 template <class InputIterator, class OutputIterator, class ReduceOp>
00188 bool reduce(InputIterator s1, InputIterator s2,
00189             OutputIterator t1, const ReduceOp& op, bool *IncludeVal) {
00190 
00191   TAU_TYPE_STRING(profileString, "bool (" + CT(s1) + ", " + CT(s2) + ", " 
00192     + CT(t1) + ", " + CT(op) + ")");
00193   TAU_PROFILE("reduce()", profileString, TAU_MESSAGE);
00194 
00195   return reduce(*Ippl::Comm, s1, s2, t1, op, IncludeVal);
00196 }
00197 
00198 
00200 // masked scalar versions of reduce ... instead of iterators, these versions
00201 // expect a single quantity to reduce and a location to place the result.
00202 // The final argument indicates whether the LOCAL NODE should have it's
00203 // value included in the reduction (by default, this is true).
00204 // Return success of operation.
00205 template <class T, class ReduceOp>
00206 bool reduce_masked(Communicate& comm, T& input, T& output,
00207                    const ReduceOp& op, bool IncludeVal) {
00208 
00209   TAU_TYPE_STRING(profileString, "bool (Communicate, " + CT(input) + ", " 
00210                   + CT(output) + " )");
00211   TAU_PROFILE("reduce_masked()", profileString, TAU_MESSAGE);
00212 
00213   // Inform dbgmsg("reduce_masked", INFORM_ALL_NODES);
00214 
00215   // determine destination node and tags
00216   int parent = 0;
00217   int sendtag = comm.next_tag(COMM_REDUCE_SEND_TAG, COMM_REDUCE_CYCLE);
00218   int rectag  = comm.next_tag(COMM_REDUCE_RECV_TAG, COMM_REDUCE_CYCLE);
00219 
00220   if (comm.myNode() != parent) {
00221     // send the source data to node 0 if we are not node 0
00222     Message *msg = new Message;
00223     // dbgmsg << "sending message, includeflag=" << IncludeVal << ", to node ";
00224     // dbgmsg << parent << " with tag " << sendtag << endl;
00225     ::putMessage(*msg, IncludeVal);
00226     if (IncludeVal)
00227       ::putMessage(*msg, input);
00228     if ( ! comm.send(msg, parent, sendtag) ) {
00229       Ippl::abort("reduce: cannot send reduce scalar.");
00230     }
00231 
00232     // then we get the results back
00233     msg = comm.receive_block(parent, rectag);
00234     // dbgmsg << "received message with size = " << msg->size();
00235     // dbgmsg << " from node " << parent << " with tag " << rectag << endl;
00236     if ( ! msg || msg->size() < 1 )
00237       Ippl::abort("reduce: cannot receive reduce results.");
00238     getMessage(*msg, output);
00239     delete msg;
00240 
00241   } else {
00242     // first copy the source into the target; this is like receiving
00243     // from ourselves
00244     if (IncludeVal)
00245       output = input;
00246 
00247     // if there are several nodes, we must get the other results
00248     if (comm.getNodes() > 1) {
00249 
00250       // the parent receives all the messages and then broadcasts the
00251       // reduced result
00252       int notReceived = comm.getNodes() - 1;
00253 
00254       // create a temporary array to store values from other nodes
00255       T *recval = new T[notReceived];
00256       bool *recflag = new bool[notReceived];
00257 
00258       // get all messages
00259       while (notReceived > 0) {
00260         // receive message
00261         int fromnode = COMM_ANY_NODE;
00262         Message *recmsg = comm.receive_block(fromnode, sendtag);
00263         if ( ! recmsg || recmsg->size() < 1 )
00264           Ippl::abort("reduce: cannot receive reduce buffers.");
00265 
00266         // get flag indicating if the message has any data; if it does,
00267         // get it and store it
00268 	::getMessage(*recmsg, recflag[fromnode - 1]);
00269         if (recflag[fromnode - 1])
00270 	  ::getMessage(*recmsg, recval[fromnode - 1]);
00271 
00272         // finished with this node's data
00273         delete recmsg;
00274         notReceived--;
00275       }
00276 
00277       // now loop through the received values and do the reduction
00278       for (int n=1; n < comm.getNodes(); ++n) {
00279         if (recflag[n-1]) {
00280           if (IncludeVal) {
00281             PETE_apply(op, output, recval[n-1]);
00282           } else {
00283             output = recval[n-1];
00284             IncludeVal = true;
00285           }
00286         }
00287       }
00288 
00289       // done with the temporary storage
00290       delete [] recflag;
00291       delete [] recval;
00292     }
00293 
00294     // Finally, broadcast the results out.  t2 should now point to the
00295     // end of the target buffer.
00296     if (comm.getNodes() > 1) {
00297       Message *sendmsg = new Message();
00298       ::putMessage(*sendmsg, output);
00299       // dbgmsg << "sending message with size " << sendmsg->size();
00300       // dbgmsg << " to all nodes with tag " << rectag << endl;
00301       if (comm.broadcast_others(sendmsg, rectag) != (comm.getNodes() - 1))
00302         Ippl::abort("reduce: cannot send reduce results.");
00303     }
00304 
00305     // we're done ... but do a check to see that we reduced SOMETHING
00306     if (!IncludeVal) {
00307       WARNMSG("reduce: there was nothing to reduce, since the masks ");
00308       WARNMSG("were all false." << endl);
00309     }
00310   }
00311 
00312   //INCIPPLSTAT(incReductions);
00313   return true;
00314 }
00315 
00316 
00318 // same as above, but this uses the default Communicate object
00319 template <class T, class ReduceOp>
00320 bool reduce_masked(T& input, T& output, const ReduceOp& op,
00321                    bool IncludeVal) {
00322 
00323   TAU_TYPE_STRING(profileString, "bool (" + CT(input) + ", " 
00324     + CT(output) + " )");
00325   TAU_PROFILE("reduce()", profileString, TAU_MESSAGE);
00326 
00327   return reduce_masked(*Ippl::Comm, input, output, op, IncludeVal);
00328 }
00329 
00330 
00332 // Scatter the data in the given source container to all other nodes.
00333 // The data is read using the first two begin,end iterators, and written
00334 // to the location indicated by the third iterator.  The next two
00335 // arrays are for target nodes and target indices for the data in the
00336 // source array; they should be of the same length as the source array.
00337 // the final argument is an STL predicate which is used to combine data
00338 // when two or more items are scattered into the same location on the
00339 // same node.
00340 // Return success of operation.
00341 template <class InputIterator, class RandomIterator, class ScatterOp>
00342 bool scatter(Communicate& comm, InputIterator s1, InputIterator s2,
00343              RandomIterator t1, int *target_node,
00344              int *target_position, const ScatterOp& op) {
00345 
00346   TAU_TYPE_STRING(profileString, "bool (Communicate&, " + CT(s1) + ", " 
00347     + CT(s2) + ", " + CT(t1) + ", int *, int *, " + CT(op) + ")");
00348   TAU_PROFILE("scatter()", profileString, TAU_MESSAGE);
00349   
00350   int i;                        // loop variables
00351   int tag = comm.next_tag(COMM_REDUCE_SCATTER_TAG, COMM_REDUCE_CYCLE);
00352 
00353   // Create a number of send messages equal to TotalNodes
00354   // these messages will be packed with the data from the source
00355   // data and sent to the node indicated by target node array
00356   // some empty messages will be sent so the recieving node knows when
00357   // it has recieved all the messages
00358   Message* msg = new Message[comm.getNodes()];
00359 
00360   // Loop over each item of the source array and pack the send messages.
00361   // The message is packed in pairs, the first element of each pair is
00362   // an integer representing the array offset in the target. The second
00363   // element is the data to be placed in that offset.
00364   int *tn = target_node;
00365   int *tp = target_position;
00366   InputIterator si;
00367   for ( si = s1; si != s2 ; si++, tn++, tp++ ) {
00368     if ( *tn < 0 || *tn >= comm.getNodes() ) {
00369       ERRORMSG("scatter: bad scatter target " << *tn << endl);
00370       return false;
00371     }
00372     //    msg[*tn].put(*tp).put(*si);
00373     putMessage(msg[*tn], *tp);
00374     putMessage(msg[*tn], *si);
00375   }
00376 
00377   // Send out the messages.  We do not delete the messages here after the
00378   // send, however.
00379   for ( i = comm.getNodes() - 1; i >= 0; i-- ) {
00380     if ( ! comm.send(msg + i, i, tag, false) ) {
00381       ERRORMSG("scatter: cannot send scatter buffer " << i << endl);
00382       return false;
00383     }
00384   }
00385 
00386   // Receive the scatter messages back now.
00387   int notReceived = comm.getNodes();
00388   while (notReceived > 0) {
00389     int fromnode = COMM_ANY_NODE;
00390     Message *recmsg = comm.receive_block(fromnode, tag);
00391     if ( ! recmsg ) {
00392       ERRORMSG("scatter: cannot receive scatter message." << endl);
00393       return false;
00394     }
00395 
00396     // for each (pos, val) pair, get it and put results in target storage
00397     int pairs = recmsg->size() / 2;
00398     int datapos;
00399     InputIterator reci;
00400     for ( i = 0 ; i < pairs ; i++ ) {
00401       //      recmsg->get(datapos);
00402       getMessage(*recmsg, datapos);
00403       reci = (InputIterator)(recmsg->item(0).data());
00404       PETE_apply(op, t1[datapos], *reci);
00405       recmsg->get();    // cleans out the item without another copy
00406     }
00407 
00408     // Finished with this message.  Delete it if it is from another node; if
00409     // it is not, we sent it to ourselves and will delete it later.
00410     if ( fromnode != comm.myNode() )
00411       delete recmsg;
00412     notReceived--;
00413   }
00414 
00415   // at the end, delete the scatter messages, and return success
00416   delete [] msg;
00417 
00418   //INCIPPLSTAT(incScatters);
00419   return true;
00420 }
00421 
00422 
00423 // same as above, but this uses the default Communicate object
00424 template <class InputIterator, class RandomIterator, class ScatterOp>
00425 bool scatter(InputIterator s1, InputIterator s2,
00426              RandomIterator t1, int *target_node,
00427              int *target_position, const ScatterOp& op) {
00428 
00429   TAU_TYPE_STRING(profileString, "bool (" + CT(s1) + ", " 
00430     + CT(s2) + ", " + CT(t1) + ", " + "int *, int *, " + CT(op) + ")");
00431   TAU_PROFILE("scatter()", profileString, TAU_MESSAGE);
00432 
00433   return scatter(*Ippl::Comm, s1, s2, t1, target_node, target_position, op);
00434 }
00435 
00436 
00437 /***************************************************************************
00438  * $RCSfile: GlobalComm.cpp,v $   $Author: adelmann $
00439  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00440  * IPPL_VERSION_ID: $Id: GlobalComm.cpp,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00441  ***************************************************************************/

Generated on Mon Jan 16 13:23:52 2006 for IPPL by  doxygen 1.4.6