OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
GlobalComm.hpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
26 // include files
27 #include "Message/GlobalComm.h"
28 #include "Message/Communicate.h"
29 #include "Message/Message.h"
30 #include "Message/Tags.h"
31 #include "Utility/IpplInfo.h"
32 #include "Utility/IpplStats.h"
33 #include "Message/DataTypes.h"
34 #include "Message/Operations.h"
35 
36 #include <algorithm>
37 
39 // Reduce equally-sized arrays across the machine, by sending to node
40 // 0 and broadcasting back the result. The arguments are two begin,end
41 // iterators for the source of the data, an iterator pointing to
42 // where the summed data should go, and an operation to perform in
43 // the reduction ... this last should be from PETE, e.g., OpAdd, etc.
44 // template classes found in either ../Expression/Applicative.h or
45 // ../Expression/TypeComputations.h. The are simple classes such as
46 // OpAnd(), OpMax(), OpAdd(), OpMultipply(), etc....
47 // Return success.
48 // The final argument indicates whether the LOCAL NODE should have it's
49 // values included in the reduction (by default, this is true). If this
50 // pointer to the boolean array is null, all the values will be included.
51 // NOTE: The input iterators must iterate over simple data objects,
52 // which do not require their own special getMessage/putMessage. If you
53 // need to reduce a complex quantity, use the scalar version of reduce.
54 template <class InputIterator, class OutputIterator, class ReduceOp>
55 bool reduce(Communicate& comm, InputIterator s1, InputIterator s2,
56  OutputIterator t1, const ReduceOp& op, bool *IncludeVal)
57 {
58 
59 
60  // Inform dbgmsg("reduce-vector", INFORM_ALL_NODES);
61 
62  // determine destination node and tags
63  int parent = 0;
64  int sendtag = comm.next_tag(COMM_REDUCE_SEND_TAG, COMM_REDUCE_CYCLE);
66 
67  // determine how many elements we have to reduce
68  unsigned int elements = 0;
69  for (InputIterator tmps1 = s1; tmps1 != s2; ++tmps1, ++elements);
70  if (elements == 0)
71  {
72  ERRORMSG("reduce: nothing to reduce." << endl);
73  }
74 
75  // create flags, if they are not provided
76  bool *useFlags = IncludeVal;
77  if (useFlags == 0 && elements > 0)
78  {
79  useFlags = new bool[elements];
80  for (unsigned int u=0; u < elements; useFlags[u++] = true);
81  }
82 
83  if ( comm.myNode() != parent )
84  {
85  // send the source data to node 0 if we are not node 0
86  Message *msg = new Message;
87  // dbgmsg << "sending message with " << elements << " elements to node ";
88  // dbgmsg << parent << " with tag " << sendtag << endl;
89  ::putMessage(*msg, elements);
90  if (elements > 0)
91  {
92  ::putMessage(*msg, s1, s2);
93  ::putMessage(*msg, useFlags, useFlags + elements);
94  }
95  if ( ! comm.send(msg, parent, sendtag) )
96  {
97  Ippl::abort("reduce: cannot send reduce buffers.");
98  }
99 
100  // then we get the results back
101  msg = comm.receive_block(parent, rectag);
102  // dbgmsg << "received message with size = " << msg->size();
103  // dbgmsg << " from node " << parent << " with tag " << rectag << endl;
104  if ( ! msg || msg->size() < 1 )
105  Ippl::abort("reduce: cannot receive reduce results.");
106  getMessage(*msg, *t1);
107  delete msg;
108 
109  }
110  else
111  {
112  // first copy the source into the target; this is like receiving
113  // from ourselves
114  InputIterator tmp1 = s1;
115  OutputIterator t2 = t1;
116  bool* copyf = useFlags;
117  for ( ; tmp1 != s2; ++tmp1, ++t2, ++copyf)
118  if (*copyf)
119  *t2 = *tmp1;
120 
121  // the parent receives all the messages and then broadcasts the
122  // reduced result
123  int notReceived = comm.getNodes() - 1;
124  while (notReceived > 0)
125  {
126  // receive message
127  int fromnode = COMM_ANY_NODE;
128  Message *recmsg = comm.receive_block(fromnode, sendtag);
129  // dbgmsg << "received message with size = " << recmsg->size();
130  // dbgmsg << " from node " << fromnode << " with tag "<<sendtag<<endl;
131  if ( ! recmsg || recmsg->size() < 1 )
132  Ippl::abort("reduce: cannot receive reduce buffers.");
133 
134  // get data from message
135  int recelems;
136  ::getMessage(*recmsg, recelems);
137  if ((unsigned int) recelems != elements)
138  Ippl::abort("reduce: mismatched element count in vector reduction.");
139  if (elements > 0)
140  {
141  InputIterator reci = (InputIterator)(recmsg->item(0).data());
142  bool *recflag = (bool *)(recmsg->item(1).data());
143 
144  // the target buffer must have size >= size of the source, so
145  // we can iterate over the source iterator
146  unsigned int u;
147  for (u=0, t2=t1; u < elements; ++t2, ++reci, ++u)
148  {
149  if (recflag[u])
150  {
151  if (useFlags[u])
152  {
153  PETE_apply(op, *t2, *reci);
154  }
155  else
156  {
157  *t2 = *reci;
158  useFlags[u] = true;
159  }
160  }
161  }
162  }
163 
164  // finished with this node's data
165  delete recmsg;
166  notReceived--;
167  }
168 
169  // Finally, broadcast the results out. t2 should now point to the
170  // end of the target buffer.
171  if (comm.getNodes() > 1)
172  {
173  Message *sendmsg = new Message();
174  putMessage(*sendmsg, t1, t2);
175  // dbgmsg << "sending message with size " << sendmsg->size();
176  // dbgmsg << " to all nodes with tag " << rectag << endl;
177  if (comm.broadcast_others(sendmsg, rectag) != (comm.getNodes() - 1))
178  Ippl::abort("reduce: cannot send reduce results.");
179  }
180  }
181 
182  // we're done
183  if (useFlags != 0 && useFlags != IncludeVal)
184  delete [] useFlags;
185 
186  //INCIPPLSTAT(incReductions);
187  return true;
188 }
189 
190 
192 // same as above, but this uses the default Communicate object
193 template <class InputIterator, class OutputIterator, class ReduceOp>
194 bool reduce(InputIterator s1, InputIterator s2,
195  OutputIterator t1, const ReduceOp& op, bool *IncludeVal)
196 {
197  return reduce(*Ippl::Comm, s1, s2, t1, op, IncludeVal);
198 }
199 
200 
202 // masked scalar versions of reduce ... instead of iterators, these versions
203 // expect a single quantity to reduce and a location to place the result.
204 // The final argument indicates whether the LOCAL NODE should have it's
205 // value included in the reduction (by default, this is true).
206 // Return success of operation.
207 template <class T, class ReduceOp>
208 bool reduce_masked(Communicate& comm, T& input, T& output,
209  const ReduceOp& op, bool IncludeVal)
210 {
211 
212  // Inform dbgmsg("reduce_masked", INFORM_ALL_NODES);
213 
214  // determine destination node and tags
215  int parent = 0;
216  int sendtag = comm.next_tag(COMM_REDUCE_SEND_TAG, COMM_REDUCE_CYCLE);
217  int rectag = comm.next_tag(COMM_REDUCE_RECV_TAG, COMM_REDUCE_CYCLE);
218 
219  if (comm.myNode() != parent)
220  {
221  // send the source data to node 0 if we are not node 0
222  Message *msg = new Message;
223  // dbgmsg << "sending message, includeflag=" << IncludeVal << ", to node ";
224  // dbgmsg << parent << " with tag " << sendtag << endl;
225  ::putMessage(*msg, IncludeVal);
226  if (IncludeVal)
227  ::putMessage(*msg, input);
228  if ( ! comm.send(msg, parent, sendtag) )
229  {
230  Ippl::abort("reduce: cannot send reduce scalar.");
231  }
232 
233  // then we get the results back
234  msg = comm.receive_block(parent, rectag);
235  // dbgmsg << "received message with size = " << msg->size();
236  // dbgmsg << " from node " << parent << " with tag " << rectag << endl;
237  if ( ! msg || msg->size() < 1 )
238  Ippl::abort("reduce: cannot receive reduce results.");
239  getMessage(*msg, output);
240  delete msg;
241 
242  }
243  else
244  {
245  // first copy the source into the target; this is like receiving
246  // from ourselves
247  if (IncludeVal)
248  output = input;
249 
250  // if there are several nodes, we must get the other results
251  if (comm.getNodes() > 1)
252  {
253 
254  // the parent receives all the messages and then broadcasts the
255  // reduced result
256  int notReceived = comm.getNodes() - 1;
257 
258  // create a temporary array to store values from other nodes
259  T *recval = new T[notReceived];
260  bool *recflag = new bool[notReceived];
261 
262  // get all messages
263  while (notReceived > 0)
264  {
265  // receive message
266  int fromnode = COMM_ANY_NODE;
267  Message *recmsg = comm.receive_block(fromnode, sendtag);
268  if ( ! recmsg || recmsg->size() < 1 )
269  Ippl::abort("reduce: cannot receive reduce buffers.");
270 
271  // get flag indicating if the message has any data; if it does,
272  // get it and store it
273  ::getMessage(*recmsg, recflag[fromnode - 1]);
274  if (recflag[fromnode - 1])
275  ::getMessage(*recmsg, recval[fromnode - 1]);
276 
277  // finished with this node's data
278  delete recmsg;
279  notReceived--;
280  }
281 
282  // now loop through the received values and do the reduction
283  for (int n=1; n < comm.getNodes(); ++n)
284  {
285  if (recflag[n-1])
286  {
287  if (IncludeVal)
288  {
289  PETE_apply(op, output, recval[n-1]);
290  }
291  else
292  {
293  output = recval[n-1];
294  IncludeVal = true;
295  }
296  }
297  }
298 
299  // done with the temporary storage
300  delete [] recflag;
301  delete [] recval;
302  }
303 
304  // Finally, broadcast the results out. t2 should now point to the
305  // end of the target buffer.
306  if (comm.getNodes() > 1)
307  {
308  Message *sendmsg = new Message();
309  ::putMessage(*sendmsg, output);
310  // dbgmsg << "sending message with size " << sendmsg->size();
311  // dbgmsg << " to all nodes with tag " << rectag << endl;
312  if (comm.broadcast_others(sendmsg, rectag) != (comm.getNodes() - 1))
313  Ippl::abort("reduce: cannot send reduce results.");
314  }
315 
316  // we're done ... but do a check to see that we reduced SOMETHING
317  /* ADA: can be "savely" ignored ...
318  if (!IncludeVal)
319  {
320  WARNMSG("reduce: there was nothing to reduce, since the masks ");
321  WARNMSG("were all false." << endl);
322  }
323  */
324  }
325 
326  //INCIPPLSTAT(incReductions);
327  return true;
328 }
329 
330 
332 // same as above, but this uses the default Communicate object
333 template <class T, class ReduceOp>
334 bool reduce_masked(T& input, T& output, const ReduceOp& op,
335  bool IncludeVal)
336 {
337 
338  return reduce_masked(*Ippl::Comm, input, output, op, IncludeVal);
339 }
340 
341 
343 // Scatter the data in the given source container to all other nodes.
344 // The data is read using the first two begin,end iterators, and written
345 // to the location indicated by the third iterator. The next two
346 // arrays are for target nodes and target indices for the data in the
347 // source array; they should be of the same length as the source array.
348 // the final argument is an STL predicate which is used to combine data
349 // when two or more items are scattered into the same location on the
350 // same node.
351 // Return success of operation.
352 template <class InputIterator, class RandomIterator, class ScatterOp>
353 bool scatter(Communicate& comm, InputIterator s1, InputIterator s2,
354  RandomIterator t1, int *target_node,
355  int *target_position, const ScatterOp& op)
356 {
357 
358  int i; // loop variables
360 
361  // Create a number of send messages equal to TotalNodes
362  // these messages will be packed with the data from the source
363  // data and sent to the node indicated by target node array
364  // some empty messages will be sent so the recieving node knows when
365  // it has recieved all the messages
366  Message* msg = new Message[comm.getNodes()];
367 
368  // Loop over each item of the source array and pack the send messages.
369  // The message is packed in pairs, the first element of each pair is
370  // an integer representing the array offset in the target. The second
371  // element is the data to be placed in that offset.
372  int *tn = target_node;
373  int *tp = target_position;
374  InputIterator si;
375  for ( si = s1; si != s2 ; si++, tn++, tp++ )
376  {
377  if ( *tn < 0 || *tn >= comm.getNodes() )
378  {
379  ERRORMSG("scatter: bad scatter target " << *tn << endl);
380  return false;
381  }
382  // msg[*tn].put(*tp).put(*si);
383  putMessage(msg[*tn], *tp);
384  putMessage(msg[*tn], *si);
385  }
386 
387  // Send out the messages. We do not delete the messages here after the
388  // send, however.
389  for ( i = comm.getNodes() - 1; i >= 0; i-- )
390  {
391  if ( ! comm.send(msg + i, i, tag, false) )
392  {
393  ERRORMSG("scatter: cannot send scatter buffer " << i << endl);
394  return false;
395  }
396  }
397 
398  // Receive the scatter messages back now.
399  int notReceived = comm.getNodes();
400  while (notReceived > 0)
401  {
402  int fromnode = COMM_ANY_NODE;
403  Message *recmsg = comm.receive_block(fromnode, tag);
404  if ( ! recmsg )
405  {
406  ERRORMSG("scatter: cannot receive scatter message." << endl);
407  return false;
408  }
409 
410  // for each (pos, val) pair, get it and put results in target storage
411  int pairs = recmsg->size() / 2;
412  int datapos;
413  InputIterator reci;
414  for ( i = 0 ; i < pairs ; i++ )
415  {
416  // recmsg->get(datapos);
417  getMessage(*recmsg, datapos);
418  reci = (InputIterator)(recmsg->item(0).data());
419  PETE_apply(op, t1[datapos], *reci);
420  recmsg->get(); // cleans out the item without another copy
421  }
422 
423  // Finished with this message. Delete it if it is from another node; if
424  // it is not, we sent it to ourselves and will delete it later.
425  if ( fromnode != comm.myNode() )
426  delete recmsg;
427  notReceived--;
428  }
429 
430  // at the end, delete the scatter messages, and return success
431  delete [] msg;
432 
433  //INCIPPLSTAT(incScatters);
434  return true;
435 }
436 
437 
438 // same as above, but this uses the default Communicate object
439 template <class InputIterator, class RandomIterator, class ScatterOp>
440 bool scatter(InputIterator s1, InputIterator s2,
441  RandomIterator t1, int *target_node,
442  int *target_position, const ScatterOp& op)
443 {
444 
445  return scatter(*Ippl::Comm, s1, s2, t1, target_node, target_position, op);
446 }
447 
448 template <typename T>
449 void gather(const T* input, T* output, int count, int root) {
450  MPI_Datatype type = get_mpi_datatype<T>(*input);
451 
452  MPI_Gather(const_cast<T*>(input), count, type,
453  output, count, type, root, Ippl::getComm());
454 }
455 
456 
457 template <typename T>
458 void scatter(const T* input, T* output, int count, int root) {
459  MPI_Datatype type = get_mpi_datatype<T>(*input);
460 
461  MPI_Scatter(const_cast<T*>(input), count, type,
462  output, count, type, root, Ippl::getComm());
463 }
464 
465 
466 template <typename T, class Op>
467 void reduce(const T* input, T* output, int count, Op op, int root) {
468  MPI_Datatype type = get_mpi_datatype<T>(*input);
469 
470  MPI_Op mpiOp = get_mpi_op<Op>(op);
471 
472  MPI_Reduce(const_cast<T*>(input), output, count, type,
473  mpiOp, root, Ippl::getComm());
474 }
475 
476 template <typename T, class Op>
477 void new_reduce(const T* input, T* output, int count, Op op, int root) {
478  MPI_Datatype type = get_mpi_datatype<T>(*input);
479 
480  MPI_Op mpiOp = get_mpi_op<Op>(op);
481 
482  MPI_Reduce(const_cast<T*>(input), output, count, type,
483  mpiOp, root, Ippl::getComm());
484 }
485 
486 
487 template <typename T, class Op>
488 void new_reduce(T* inout, int count, Op op, int root) {
489  MPI_Datatype type = get_mpi_datatype<T>(*inout);
490 
491  MPI_Op mpiOp = get_mpi_op<Op>(op);
492 
493  if (Ippl::myNode() == root) {
494  MPI_Reduce(MPI_IN_PLACE, inout, count, type,
495  mpiOp, root, Ippl::getComm());
496  } else {
497  MPI_Reduce(inout, inout, count, type,
498  mpiOp, root, Ippl::getComm());
499  }
500 }
501 
502 
503 template <typename T, class Op>
504 void reduce(const T& input, T& output, int count, Op op, int root) {
505  reduce(&input, &output, count, op, root);
506 }
507 
508 
509 template <typename T, class Op>
510 void allreduce(const T* input, T* output, int count, Op op) {
511  MPI_Datatype type = get_mpi_datatype<T>(*input);
512 
513  MPI_Op mpiOp = get_mpi_op<Op>(op);
514 
515  MPI_Allreduce(const_cast<T*>(input), output, count, type,
516  mpiOp, Ippl::getComm());
517 }
518 
519 template <typename T, class Op>
520 void allreduce(const T& input, T& output, int count, Op op) {
521  allreduce(&input, &output, count, op);
522 }
523 
524 
525 template <typename T, class Op>
526 void allreduce(T* inout, int count, Op op) {
527  MPI_Datatype type = get_mpi_datatype<T>(*inout);
528 
529  MPI_Op mpiOp = get_mpi_op<Op>(op);
530 
531  MPI_Allreduce(MPI_IN_PLACE, inout, count, type,
532  mpiOp, Ippl::getComm());
533 }
534 
535 
536 template <typename T, class Op>
537 void allreduce(T& inout, int count, Op op) {
538  allreduce(&inout, count, op);
539 }
void PETE_apply(const OpPeriodic< T > &e, T &a, const T &b)
Definition: BCond.hpp:373
elements
Definition: IndexMap.cpp:141
static void abort(const char *=0, int exitcode=(-1))
Definition: IpplInfo.cpp:696
#define COMM_REDUCE_SCATTER_TAG
Definition: Tags.h:34
Definition: rbendmap.h:8
int myNode() const
Definition: Communicate.h:155
#define ERRORMSG(msg)
Definition: IpplInfo.h:399
MsgItem & item(size_t n)
Definition: Message.h:316
const int COMM_ANY_NODE
Definition: Communicate.h:40
static int myNode()
Definition: IpplInfo.cpp:794
bool scatter(Communicate &, InputIterator, InputIterator, RandomIterator, int *, int *, const ScatterOp &)
Definition: GlobalComm.hpp:353
int next_tag(int t, int s=1000)
Definition: TagMaker.h:43
bool reduce(Communicate &, InputIterator, InputIterator, OutputIterator, const ReduceOp &, bool *IncludeVal=0)
Definition: GlobalComm.hpp:55
size_t size() const
Definition: Message.h:300
void gather(const T *input, T *output, int count, int root=0)
Definition: GlobalComm.hpp:449
void allreduce(const T *input, T *output, int count, Op op)
Definition: GlobalComm.hpp:510
#define COMM_REDUCE_CYCLE
Definition: Tags.h:35
virtual int broadcast_others(Message *, int, bool delmsg=true)
#define COMM_REDUCE_RECV_TAG
Definition: Tags.h:33
static MPI_Comm getComm()
Definition: IpplInfo.h:178
Message & get(const T &cval)
Definition: Message.h:484
void getMessage(Message &m, T &t)
Definition: Message.h:580
bool reduce_masked(Communicate &comm, T &input, T &output, const ReduceOp &op, bool IncludeVal)
Definition: GlobalComm.hpp:208
void putMessage(Message &m, const T &t)
Definition: Message.h:557
Message * receive_block(int &node, int &tag)
void new_reduce(const T *input, T *output, int count, Op op, int root=0)
Definition: GlobalComm.hpp:477
static Communicate * Comm
Definition: IpplInfo.h:93
bool send(Message *, int node, int tag, bool delmsg=true)
void * data()
Definition: Message.h:252
int getNodes() const
Definition: Communicate.h:143
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define COMM_REDUCE_SEND_TAG
Definition: Tags.h:32