OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
GlobalComm.hpp
Go to the documentation of this file.
1// -*- C++ -*-
2/***************************************************************************
3 *
4 * The IPPL Framework
5 *
6 * This program was prepared by PSI.
7 * All rights in the program are reserved by PSI.
8 * Neither PSI nor the author(s)
9 * makes any warranty, express or implied, or assumes any liability or
10 * responsibility for the use of this software
11 *
12 * Visit www.amas.web.psi for more details
13 *
14 ***************************************************************************/
15
16// -*- C++ -*-
17/***************************************************************************
18 *
19 * The IPPL Framework
20 *
21 *
22 * Visit http://people.web.psi.ch/adelmann/ for more details
23 *
24 ***************************************************************************/
25
26// include files
27#include "Message/GlobalComm.h"
28#include "Message/Communicate.h"
29#include "Message/Message.h"
30#include "Message/Tags.h"
31#include "Utility/IpplInfo.h"
32#include "Utility/IpplStats.h"
33#include "Message/DataTypes.h"
34#include "Message/Operations.h"
35
36#include <algorithm>
37
39// Reduce equally-sized arrays across the machine, by sending to node
40// 0 and broadcasting back the result. The arguments are two begin,end
41// iterators for the source of the data, an iterator pointing to
42// where the summed data should go, and an operation to perform in
43// the reduction ... this last should be from PETE, e.g., OpAdd, etc.
44// template classes found in either ../Expression/Applicative.h or
45// ../Expression/TypeComputations.h. The are simple classes such as
46// OpAnd(), OpMax(), OpAdd(), OpMultipply(), etc....
47// Return success.
48// The final argument indicates whether the LOCAL NODE should have it's
49// values included in the reduction (by default, this is true). If this
50// pointer to the boolean array is null, all the values will be included.
51// NOTE: The input iterators must iterate over simple data objects,
52// which do not require their own special getMessage/putMessage. If you
53// need to reduce a complex quantity, use the scalar version of reduce.
54template <class InputIterator, class OutputIterator, class ReduceOp>
55bool reduce(Communicate& comm, InputIterator s1, InputIterator s2,
56 OutputIterator t1, const ReduceOp& op, bool *IncludeVal)
57{
58
59
60 // Inform dbgmsg("reduce-vector", INFORM_ALL_NODES);
61
62 // determine destination node and tags
63 int parent = 0;
66
67 // determine how many elements we have to reduce
68 unsigned int elements = 0;
69 for (InputIterator tmps1 = s1; tmps1 != s2; ++tmps1, ++elements);
70 if (elements == 0)
71 {
72 ERRORMSG("reduce: nothing to reduce." << endl);
73 }
74
75 // create flags, if they are not provided
76 bool *useFlags = IncludeVal;
77 if (useFlags == 0 && elements > 0)
78 {
79 useFlags = new bool[elements];
80 for (unsigned int u=0; u < elements; useFlags[u++] = true);
81 }
82
83 if ( comm.myNode() != parent )
84 {
85 // send the source data to node 0 if we are not node 0
86 Message *msg = new Message;
87 // dbgmsg << "sending message with " << elements << " elements to node ";
88 // dbgmsg << parent << " with tag " << sendtag << endl;
90 if (elements > 0)
91 {
92 ::putMessage(*msg, s1, s2);
93 ::putMessage(*msg, useFlags, useFlags + elements);
94 }
95 if ( ! comm.send(msg, parent, sendtag) )
96 {
97 Ippl::abort("reduce: cannot send reduce buffers.");
98 }
99
100 // then we get the results back
101 msg = comm.receive_block(parent, rectag);
102 // dbgmsg << "received message with size = " << msg->size();
103 // dbgmsg << " from node " << parent << " with tag " << rectag << endl;
104 if ( ! msg || msg->size() < 1 )
105 Ippl::abort("reduce: cannot receive reduce results.");
106 getMessage(*msg, *t1);
107 delete msg;
108
109 }
110 else
111 {
112 // first copy the source into the target; this is like receiving
113 // from ourselves
114 InputIterator tmp1 = s1;
115 OutputIterator t2 = t1;
116 bool* copyf = useFlags;
117 for ( ; tmp1 != s2; ++tmp1, ++t2, ++copyf)
118 if (*copyf)
119 *t2 = *tmp1;
120
121 // the parent receives all the messages and then broadcasts the
122 // reduced result
123 int notReceived = comm.getNodes() - 1;
124 while (notReceived > 0)
125 {
126 // receive message
127 int fromnode = COMM_ANY_NODE;
128 Message *recmsg = comm.receive_block(fromnode, sendtag);
129 // dbgmsg << "received message with size = " << recmsg->size();
130 // dbgmsg << " from node " << fromnode << " with tag "<<sendtag<<endl;
131 if ( ! recmsg || recmsg->size() < 1 )
132 Ippl::abort("reduce: cannot receive reduce buffers.");
133
134 // get data from message
135 int recelems;
136 ::getMessage(*recmsg, recelems);
137 if ((unsigned int) recelems != elements)
138 Ippl::abort("reduce: mismatched element count in vector reduction.");
139 if (elements > 0)
140 {
141 InputIterator reci = (InputIterator)(recmsg->item(0).data());
142 bool *recflag = (bool *)(recmsg->item(1).data());
143
144 // the target buffer must have size >= size of the source, so
145 // we can iterate over the source iterator
146 unsigned int u;
147 for (u=0, t2=t1; u < elements; ++t2, ++reci, ++u)
148 {
149 if (recflag[u])
150 {
151 if (useFlags[u])
152 {
153 PETE_apply(op, *t2, *reci);
154 }
155 else
156 {
157 *t2 = *reci;
158 useFlags[u] = true;
159 }
160 }
161 }
162 }
163
164 // finished with this node's data
165 delete recmsg;
166 notReceived--;
167 }
168
169 // Finally, broadcast the results out. t2 should now point to the
170 // end of the target buffer.
171 if (comm.getNodes() > 1)
172 {
173 Message *sendmsg = new Message();
174 putMessage(*sendmsg, t1, t2);
175 // dbgmsg << "sending message with size " << sendmsg->size();
176 // dbgmsg << " to all nodes with tag " << rectag << endl;
177 if (comm.broadcast_others(sendmsg, rectag) != (comm.getNodes() - 1))
178 Ippl::abort("reduce: cannot send reduce results.");
179 }
180 }
181
182 // we're done
183 if (useFlags != 0 && useFlags != IncludeVal)
184 delete [] useFlags;
185
186 //INCIPPLSTAT(incReductions);
187 return true;
188}
189
190
192// same as above, but this uses the default Communicate object
193template <class InputIterator, class OutputIterator, class ReduceOp>
194bool reduce(InputIterator s1, InputIterator s2,
195 OutputIterator t1, const ReduceOp& op, bool *IncludeVal)
196{
197 return reduce(*Ippl::Comm, s1, s2, t1, op, IncludeVal);
198}
199
200
202// masked scalar versions of reduce ... instead of iterators, these versions
203// expect a single quantity to reduce and a location to place the result.
204// The final argument indicates whether the LOCAL NODE should have it's
205// value included in the reduction (by default, this is true).
206// Return success of operation.
207template <class T, class ReduceOp>
208bool reduce_masked(Communicate& comm, T& input, T& output,
209 const ReduceOp& op, bool IncludeVal)
210{
211
212 // Inform dbgmsg("reduce_masked", INFORM_ALL_NODES);
213
214 // determine destination node and tags
215 int parent = 0;
218
219 if (comm.myNode() != parent)
220 {
221 // send the source data to node 0 if we are not node 0
222 Message *msg = new Message;
223 // dbgmsg << "sending message, includeflag=" << IncludeVal << ", to node ";
224 // dbgmsg << parent << " with tag " << sendtag << endl;
225 ::putMessage(*msg, IncludeVal);
226 if (IncludeVal)
227 ::putMessage(*msg, input);
228 if ( ! comm.send(msg, parent, sendtag) )
229 {
230 Ippl::abort("reduce: cannot send reduce scalar.");
231 }
232
233 // then we get the results back
234 msg = comm.receive_block(parent, rectag);
235 // dbgmsg << "received message with size = " << msg->size();
236 // dbgmsg << " from node " << parent << " with tag " << rectag << endl;
237 if ( ! msg || msg->size() < 1 )
238 Ippl::abort("reduce: cannot receive reduce results.");
239 getMessage(*msg, output);
240 delete msg;
241
242 }
243 else
244 {
245 // first copy the source into the target; this is like receiving
246 // from ourselves
247 if (IncludeVal)
248 output = input;
249
250 // if there are several nodes, we must get the other results
251 if (comm.getNodes() > 1)
252 {
253
254 // the parent receives all the messages and then broadcasts the
255 // reduced result
256 int notReceived = comm.getNodes() - 1;
257
258 // create a temporary array to store values from other nodes
259 T *recval = new T[notReceived];
260 bool *recflag = new bool[notReceived];
261
262 // get all messages
263 while (notReceived > 0)
264 {
265 // receive message
266 int fromnode = COMM_ANY_NODE;
267 Message *recmsg = comm.receive_block(fromnode, sendtag);
268 if ( ! recmsg || recmsg->size() < 1 )
269 Ippl::abort("reduce: cannot receive reduce buffers.");
270
271 // get flag indicating if the message has any data; if it does,
272 // get it and store it
273 ::getMessage(*recmsg, recflag[fromnode - 1]);
274 if (recflag[fromnode - 1])
275 ::getMessage(*recmsg, recval[fromnode - 1]);
276
277 // finished with this node's data
278 delete recmsg;
279 notReceived--;
280 }
281
282 // now loop through the received values and do the reduction
283 for (int n=1; n < comm.getNodes(); ++n)
284 {
285 if (recflag[n-1])
286 {
287 if (IncludeVal)
288 {
289 PETE_apply(op, output, recval[n-1]);
290 }
291 else
292 {
293 output = recval[n-1];
294 IncludeVal = true;
295 }
296 }
297 }
298
299 // done with the temporary storage
300 delete [] recflag;
301 delete [] recval;
302 }
303
304 // Finally, broadcast the results out. t2 should now point to the
305 // end of the target buffer.
306 if (comm.getNodes() > 1)
307 {
308 Message *sendmsg = new Message();
309 ::putMessage(*sendmsg, output);
310 // dbgmsg << "sending message with size " << sendmsg->size();
311 // dbgmsg << " to all nodes with tag " << rectag << endl;
312 if (comm.broadcast_others(sendmsg, rectag) != (comm.getNodes() - 1))
313 Ippl::abort("reduce: cannot send reduce results.");
314 }
315
316 // we're done ... but do a check to see that we reduced SOMETHING
317 /* ADA: can be "savely" ignored ...
318 if (!IncludeVal)
319 {
320 WARNMSG("reduce: there was nothing to reduce, since the masks ");
321 WARNMSG("were all false." << endl);
322 }
323 */
324 }
325
326 //INCIPPLSTAT(incReductions);
327 return true;
328}
329
330
332// same as above, but this uses the default Communicate object
333template <class T, class ReduceOp>
334bool reduce_masked(T& input, T& output, const ReduceOp& op,
335 bool IncludeVal)
336{
337
338 return reduce_masked(*Ippl::Comm, input, output, op, IncludeVal);
339}
340
341
343// Scatter the data in the given source container to all other nodes.
344// The data is read using the first two begin,end iterators, and written
345// to the location indicated by the third iterator. The next two
346// arrays are for target nodes and target indices for the data in the
347// source array; they should be of the same length as the source array.
348// the final argument is an STL predicate which is used to combine data
349// when two or more items are scattered into the same location on the
350// same node.
351// Return success of operation.
352template <class InputIterator, class RandomIterator, class ScatterOp>
353bool scatter(Communicate& comm, InputIterator s1, InputIterator s2,
354 RandomIterator t1, int *target_node,
355 int *target_position, const ScatterOp& op)
356{
357
358 int i; // loop variables
360
361 // Create a number of send messages equal to TotalNodes
362 // these messages will be packed with the data from the source
363 // data and sent to the node indicated by target node array
364 // some empty messages will be sent so the recieving node knows when
365 // it has recieved all the messages
366 Message* msg = new Message[comm.getNodes()];
367
368 // Loop over each item of the source array and pack the send messages.
369 // The message is packed in pairs, the first element of each pair is
370 // an integer representing the array offset in the target. The second
371 // element is the data to be placed in that offset.
372 int *tn = target_node;
373 int *tp = target_position;
374 InputIterator si;
375 for ( si = s1; si != s2 ; si++, tn++, tp++ )
376 {
377 if ( *tn < 0 || *tn >= comm.getNodes() )
378 {
379 ERRORMSG("scatter: bad scatter target " << *tn << endl);
380 return false;
381 }
382 // msg[*tn].put(*tp).put(*si);
383 putMessage(msg[*tn], *tp);
384 putMessage(msg[*tn], *si);
385 }
386
387 // Send out the messages. We do not delete the messages here after the
388 // send, however.
389 for ( i = comm.getNodes() - 1; i >= 0; i-- )
390 {
391 if ( ! comm.send(msg + i, i, tag, false) )
392 {
393 ERRORMSG("scatter: cannot send scatter buffer " << i << endl);
394 return false;
395 }
396 }
397
398 // Receive the scatter messages back now.
399 int notReceived = comm.getNodes();
400 while (notReceived > 0)
401 {
402 int fromnode = COMM_ANY_NODE;
403 Message *recmsg = comm.receive_block(fromnode, tag);
404 if ( ! recmsg )
405 {
406 ERRORMSG("scatter: cannot receive scatter message." << endl);
407 return false;
408 }
409
410 // for each (pos, val) pair, get it and put results in target storage
411 int pairs = recmsg->size() / 2;
412 int datapos;
413 InputIterator reci;
414 for ( i = 0 ; i < pairs ; i++ )
415 {
416 // recmsg->get(datapos);
417 getMessage(*recmsg, datapos);
418 reci = (InputIterator)(recmsg->item(0).data());
419 PETE_apply(op, t1[datapos], *reci);
420 recmsg->get(); // cleans out the item without another copy
421 }
422
423 // Finished with this message. Delete it if it is from another node; if
424 // it is not, we sent it to ourselves and will delete it later.
425 if ( fromnode != comm.myNode() )
426 delete recmsg;
427 notReceived--;
428 }
429
430 // at the end, delete the scatter messages, and return success
431 delete [] msg;
432
433 //INCIPPLSTAT(incScatters);
434 return true;
435}
436
437
438// same as above, but this uses the default Communicate object
439template <class InputIterator, class RandomIterator, class ScatterOp>
440bool scatter(InputIterator s1, InputIterator s2,
441 RandomIterator t1, int *target_node,
442 int *target_position, const ScatterOp& op)
443{
444
445 return scatter(*Ippl::Comm, s1, s2, t1, target_node, target_position, op);
446}
447
448template <typename T>
449void gather(const T* input, T* output, int count, int root) {
450 MPI_Datatype type = get_mpi_datatype<T>(*input);
451
452 MPI_Gather(const_cast<T*>(input), count, type,
453 output, count, type, root, Ippl::getComm());
454}
455
456
457template <typename T>
458void scatter(const T* input, T* output, int count, int root) {
459 MPI_Datatype type = get_mpi_datatype<T>(*input);
460
461 MPI_Scatter(const_cast<T*>(input), count, type,
462 output, count, type, root, Ippl::getComm());
463}
464
465
466template <typename T, class Op>
467void reduce(const T* input, T* output, int count, Op op, int root) {
468 MPI_Datatype type = get_mpi_datatype<T>(*input);
469
470 MPI_Op mpiOp = get_mpi_op<Op>(op);
471
472 MPI_Reduce(const_cast<T*>(input), output, count, type,
473 mpiOp, root, Ippl::getComm());
474}
475
476template <typename T, class Op>
477void new_reduce(const T* input, T* output, int count, Op op, int root) {
478 MPI_Datatype type = get_mpi_datatype<T>(*input);
479
480 MPI_Op mpiOp = get_mpi_op<Op>(op);
481
482 MPI_Reduce(const_cast<T*>(input), output, count, type,
483 mpiOp, root, Ippl::getComm());
484}
485
486
487template <typename T, class Op>
488void new_reduce(T* inout, int count, Op op, int root) {
489 MPI_Datatype type = get_mpi_datatype<T>(*inout);
490
491 MPI_Op mpiOp = get_mpi_op<Op>(op);
492
493 if (Ippl::myNode() == root) {
494 MPI_Reduce(MPI_IN_PLACE, inout, count, type,
495 mpiOp, root, Ippl::getComm());
496 } else {
497 MPI_Reduce(inout, inout, count, type,
498 mpiOp, root, Ippl::getComm());
499 }
500}
501
502
503template <typename T, class Op>
504void reduce(const T& input, T& output, int count, Op op, int root) {
505 reduce(&input, &output, count, op, root);
506}
507
508
509template <typename T, class Op>
510void allreduce(const T* input, T* output, int count, Op op) {
511 MPI_Datatype type = get_mpi_datatype<T>(*input);
512
513 MPI_Op mpiOp = get_mpi_op<Op>(op);
514
515 MPI_Allreduce(const_cast<T*>(input), output, count, type,
516 mpiOp, Ippl::getComm());
517}
518
519template <typename T, class Op>
520void allreduce(const T& input, T& output, int count, Op op) {
521 allreduce(&input, &output, count, op);
522}
523
524
525template <typename T, class Op>
526void allreduce(T* inout, int count, Op op) {
527 MPI_Datatype type = get_mpi_datatype<T>(*inout);
528
529 MPI_Op mpiOp = get_mpi_op<Op>(op);
530
531 MPI_Allreduce(MPI_IN_PLACE, inout, count, type,
532 mpiOp, Ippl::getComm());
533}
534
535
536template <typename T, class Op>
537void allreduce(T& inout, int count, Op op) {
538 allreduce(&inout, count, op);
539}
elements
Definition: IndexMap.cpp:163
void putMessage(Message &m, const T &t)
Definition: Message.h:549
void getMessage(Message &m, T &t)
Definition: Message.h:572
void allreduce(const T *input, T *output, int count, Op op)
Definition: GlobalComm.hpp:510
bool reduce_masked(Communicate &comm, T &input, T &output, const ReduceOp &op, bool IncludeVal)
Definition: GlobalComm.hpp:208
bool reduce(Communicate &comm, InputIterator s1, InputIterator s2, OutputIterator t1, const ReduceOp &op, bool *IncludeVal)
Definition: GlobalComm.hpp:55
bool scatter(Communicate &comm, InputIterator s1, InputIterator s2, RandomIterator t1, int *target_node, int *target_position, const ScatterOp &op)
Definition: GlobalComm.hpp:353
void gather(const T *input, T *output, int count, int root)
Definition: GlobalComm.hpp:449
void new_reduce(const T *input, T *output, int count, Op op, int root)
Definition: GlobalComm.hpp:477
const int COMM_ANY_NODE
Definition: Communicate.h:40
#define COMM_REDUCE_SEND_TAG
Definition: Tags.h:32
#define COMM_REDUCE_CYCLE
Definition: Tags.h:35
#define COMM_REDUCE_RECV_TAG
Definition: Tags.h:33
#define COMM_REDUCE_SCATTER_TAG
Definition: Tags.h:34
void PETE_apply(const OpPeriodic< T > &, T &a, const T &b)
Definition: BCond.hpp:353
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define ERRORMSG(msg)
Definition: IpplInfo.h:350
boost::function< boost::tuple< double, bool >(arguments_t)> type
Definition: function.hpp:21
bool send(Message *, int node, int tag, bool delmsg=true)
virtual int broadcast_others(Message *, int, bool delmsg=true)
Message * receive_block(int &node, int &tag)
int getNodes() const
Definition: Communicate.h:143
int myNode() const
Definition: Communicate.h:155
size_t size() const
Definition: Message.h:292
MsgItem & item(size_t n)
Definition: Message.h:308
Message & get(const T &cval)
Definition: Message.h:476
void * data()
Definition: Message.h:244
int next_tag(int t, int s=1000)
Definition: TagMaker.h:39
static void abort(const char *=0)
Definition: IpplInfo.cpp:616
static MPI_Comm getComm()
Definition: IpplInfo.h:152
static int myNode()
Definition: IpplInfo.cpp:691
static Communicate * Comm
Definition: IpplInfo.h:84