OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
BinaryBalancer.hpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
26 // include files
29 #include "Field/BareField.h"
30 #include "Utility/PAssert.h"
31 
32 
34 
35 /*
36 
37  Implementation of BinaryBalancer.
38 
39  The general strategy is that you do log(P) splits on the domain. It
40  starts with the whole domain, does a reduction to find where to
41  split it, then does reductions on each of the resulting domains to
42  find where to split those, then reductions on those to split them,
43  and so on until it is done.
44 
45  Suppose you're on the n'th split, so there are 2**n domains figured
46  out so far, and after the split there will be 2**(n+1) splits. In
47  each of those 2**n domains you need to find
48 
49  a) The axis to split on. This is done by just finding the longest
50  axis in that domain.
51 
52  b) The location within that domain to make the split. This is done
53  by reducing the weights on all dimensions except the axis to be
54  split and finding the location within that array that puts half
55  the weight on each side.
56 
57  The reduction for b) is done is a scalable way. It is a parallel
58  reduction, and if there are 2**n domains being split, the reductions
59  are accumulated onto processors 0..2**n-1. Those processors
60  calculate the split locations and broadcast them to all the
61  processors.
62 
63  At every stage of the process all the processors know all the
64  domains. This is necessary because the weight array could be
65  distributed arbitrarily, so the reductions could involve any
66  processors.
67 
68  Nevertheless, the reductions are performed efficiently. Using
69  DomainMaps, only the processors that need to participate in a
70  reduction do participate.
71 
72 */
73 
75 
76 //
77 // Given an NDIndex<Dim> find the axis with the longest length, that is
78 // not SERIAL.
79 //
80 template <unsigned Dim>
81 static int
82 FindCutAxis(const NDIndex<Dim> &domain, const FieldLayout<Dim> &layout)
83 {
84 
85 
86 
87  // CutAxis will be the dimension to cut.
88  int cutAxis=-1;
89  // MaxLength will have the maximum length of any dimension.
90  unsigned int maxLength=0;
91  // Loop over dimension.
92  for (unsigned int d=0; d<Dim; ++d) {
93  if (layout.getDistribution(d) != SERIAL ||
94  layout.getRequestedDistribution(d) != SERIAL) {
95  // Check if this axis is longer than the current max.
96  unsigned int length = domain[d].length();
97  if ( maxLength < length ) {
98  // If so, remember it.
99  cutAxis = d;
100  maxLength = length;
101  }
102  }
103  }
104 
105  // Make sure we found one.
106  //PAssert_GE(cutAxis, 0);
107 
108  if(cutAxis<0)
109  throw BinaryRepartitionFailed();
110 
111  // Return the longest axis.
112  return cutAxis;
113 }
114 
115 
116 //
117 // Find the median point in a container.
118 // The first two arguments are begin and end iterators.
119 // The third is a dummy argument of type T, where the
120 // container is of objects of type T.
121 //
122 
123 template<class RandomIterator, class T>
124 static RandomIterator
125 FindMedian(int nprocs,RandomIterator begin, RandomIterator end, T)
126 {
127  // First find the total weight.
128  T w = 0;
129  // Use w to find T's name
130 
131 
132  // If we have only one processor, cut at the left.
133  if ( nprocs == 1 )
134  return begin;
135 
136  int lprocs = nprocs/2;
137  RandomIterator rp, median;
138  for (rp=begin; rp!=end; ++rp)
139  w += *rp;
140 
141  // If the total weight is zero, we need to do things specially.
142  if ( w==0 )
143  {
144  // The total weight is zero.
145  // Put about as much zero weight stuff on the left and the right.
146  median = begin + ((end-begin)*lprocs)/nprocs;
147  }
148  else
149  {
150  // The total weight is nonzero.
151  // Put equal amounts on the left and right processors.
152  T w2 = (w*lprocs)/nprocs;
153  // Find the point with half the weight to the left.
154  bool found = false;
155  w = T(0);
156  for (rp=begin; (rp!=end)&&(!found); ++rp) {
157  // Add current element to running total
158  w += *rp;
159  if (w>=w2) {
160  found = true;
161  if ( (w-w2) > (*rp/T(2)) )
162  median = rp;
163  else
164  median = (rp+1 != end) ? rp+1 : rp;
165  }
166  }
167  }
168  // Found it. Exit.
169  return median;
170 }
171 
173 
174 //
175 // Routines for doing reductions over all dimensions but one of a
176 // local BrickIterator.
177 //
178 // These will only work for 1, 2 and 3 dimensions right now.
179 // I'm sure there is a way to make this work efficiently
180 // for arbitrary dimension, but this works for now.
181 //
182 
183 // Reduce over a 3 dimensional BrickIterator
184 // given the axis not to reduce on
185 // and where in that dimension to reduce.
186 
187 static inline double
188 PerpReduce(BrickIterator<double,3>& data, int i, int cutAxis)
189 {
190  double r=0;
191  if (cutAxis==0)
192  {
193  int l1 = data.size(1);
194  int l2 = data.size(2);
195  if ( (l1>0) && (l2>0) )
196  for (int j2=0; j2<l2; ++j2)
197  for (int j1=0; j1<l1; ++j1)
198  r += data.offset(i,j1,j2);
199  }
200  else if (cutAxis==1)
201  {
202  int l0 = data.size(0);
203  int l2 = data.size(2);
204  if ( (l0>0) && (l2>0) )
205  for (int j2=0; j2<l2; ++j2)
206  for (int j0=0; j0<l0; ++j0)
207  r += data.offset(j0,i,j2);
208  }
209  else if (cutAxis==2)
210  {
211  int l0 = data.size(0);
212  int l1 = data.size(1);
213  if ( (l0>0) && (l1>0) )
214  for (int j1=0; j1<l1; ++j1)
215  for (int j0=0; j0<l0; ++j0)
216  r += data.offset(j0,j1,i);
217  }
218  return r;
219 }
220 
221 // Reduce over a 2 dimensional BrickIterator
222 // given the axis not to reduce on
223 // and where in that dimension to reduce.
224 
225 static inline double
226 PerpReduce(BrickIterator<double,2>& data, int i, int cutAxis)
227 {
228  double r=0;
229  if (cutAxis==0)
230  {
231  int length = data.size(1);
232  for (int j=0; j<length; ++j)
233  r += data.offset(i,j);
234  }
235  else
236  {
237  int length = data.size(0);
238  for (int j=0; j<length; ++j)
239  r += data.offset(j,i);
240  }
241  return r;
242 }
243 
244 //
245 // Reduce over a 1 dimensional BrickIterator
246 // given the axis not to reduce on
247 // and where in that dimension to reduce.
248 //
249 
250 static inline double
251 PerpReduce(BrickIterator<double,1>& data, int i, int )
252 {
253  return data.offset(i);
254 }
255 
256 //
257 // Reduce over all the dimensions but one of a brick iterator.
258 // Put the results in an array of doubles.
259 //
260 
261 template<unsigned Dim>
262 static void
263 LocalReduce(double *reduced, int cutAxis, BrickIterator<double,Dim> data)
264 {
265 
266 
267 
268  int length = data.size(cutAxis);
269  for (int i=0; i<length; ++i)
270  reduced[i] = PerpReduce(data,i,cutAxis);
271 }
272 
274 
275 //
276 // For each domain, do the local reduction of
277 // data from weights, and send that to the node
278 // that is accumulating stuff for that domain.
279 //
280 // The local reductions take place all across the machine.
281 // The reductions for each domain are finished on a single processor.
282 // Each of those final reductions are on different processors.
283 //
284 
285 template<class IndexIterator, unsigned Dim>
286 static void
287 SendReduce(IndexIterator domainsBegin, IndexIterator domainsEnd,
288  BareField<double,Dim>& weights, int tag)
289 {
290 
291  // Buffers to store up domains and blocks of reduced data.
292  std::vector<double*> reducedBuffer;
293  std::vector<Index> domainBuffer;
294  // Loop over all of the domains. Keep a counter of which one you're on.
295  int di;
296  IndexIterator dp;
297  /*out << "SendReduce, ndomains=" << domainsEnd-domainsBegin << endl;*/
298  for (dp=domainsBegin, di=0; dp!=domainsEnd; ++dp, ++di)
299  {
300  /*out << "SendReduce, domain=" << *dp << endl;*/
301  // Find the dimension we'll be cutting on.
302  // We'll reduce in the dimensions perpendicular to this.
303  int cutAxis = FindCutAxis(*dp, weights.getLayout());
304  // Find the LFields on this processor that touch this domain.
306  for (lf_p=weights.begin_if(); lf_p != weights.end_if(); ++lf_p)
307  if ( (*dp).touches( (*lf_p).second->getOwned() ) )
308  {
309  // Find the intersection with this LField.
310  NDIndex<Dim> intersection =
311  (*dp).intersect( (*lf_p).second->getOwned() );
312  // Allocate the accumulation buffer.
313  int length = intersection[cutAxis].length();
314  double *reduced = new double[length];
315  // Reduce into the local buffer.
316  /*out << "LocalReduce " << intersection << endl;*/
317  LocalReduce(reduced,cutAxis,(*lf_p).second->begin(intersection));
318  // Save the domain and the data.
319  reducedBuffer.push_back(reduced);
320  domainBuffer.push_back(intersection[cutAxis]);
321  }
322 
323  // If we found any hits, send them out.
324  int nrdomains = reducedBuffer.size();
325  /*out << "nrdomains=" << nrdomains << endl;*/
326  if ( nrdomains>0 )
327  {
328  // Build a message to hold everything for this domain.
329  Message *mess = new Message;
330  // The number of reduced domains is the first thing in the message.
331  mess->put(nrdomains);
332  // Loop over the reduced domains, storing in the message each time.
333  std::vector<Index>::iterator dbp = domainBuffer.begin();
334  std::vector<double*>::iterator rbp = reducedBuffer.begin();
335  for (int i=0; i<nrdomains; ++i, ++dbp, ++rbp)
336  {
337  // First store the domain.
338  /*out << "putMessage " << *dbp << endl;*/
339  putMessage(*mess,*dbp);
340  // Then the reduced data using begin/end iterators.
341  // Tell the message to delete the memory when it is done.
342  double *p = *rbp;
343  mess->setCopy(false).setDelete(true).put(p,p+(*dbp).length());
344  }
345  // Send the message to proc di.
346  Ippl::Comm->send(mess, di, tag);
347  }
348  // Clear out the buffers.
349  domainBuffer.erase( domainBuffer.begin(), domainBuffer.end() );
350  reducedBuffer.erase( reducedBuffer.begin(), reducedBuffer.end() );
351  /*out << "Bottom of SendReduce loop" << endl;*/
352  }
353 }
354 
356 
357 //
358 // Receive the messages with reduced data sent out in SendReduce.
359 // Finish the reduction.
360 // Return begin and end iterators for the reduced data.
361 //
362 
363 template<unsigned Dim>
364 static void
365 ReceiveReduce(NDIndex<Dim>& domain, BareField<double,Dim>& weights,
366  int reduce_tag, int nprocs,
367  int& cutLoc, int& cutAxis)
368 {
369 
370 
371  // Build a place to accumulate the reduced data.
372  cutAxis = FindCutAxis(domain, weights.getLayout());
373  /*out << "ReceiveReduce, cutAxis=" << cutAxis << endl;*/
374  int i, length = domain[cutAxis].length();
375  int offset = domain[cutAxis].first();
376  std::vector<double> reduced(length);
377  std::vector<double> subReduced(length);
378  for (i=0; i<length; ++i)
379  reduced[i] = 0;
380 
381  // Build a count of the number of messages to expect.
382  // We get *one message* from each node that has a touch.
383  int expected = 0;
384  int nodes = Ippl::getNodes();
385  int mynode = Ippl::myNode();
386  bool* found_touch = new bool[nodes];
387  for (i=0; i<nodes; ++i) found_touch[i] = false;
388  // First look in the local vnodes of weights.
389  typename BareField<double,Dim>::iterator_if lf_p, lf_end = weights.end_if();
390  for (lf_p = weights.begin_if();
391  lf_p != lf_end && !(found_touch[mynode]); ++lf_p) {
392  // Expect a message if it touches.
393  if ( (*lf_p).second->getOwned().touches(domain) )
394  found_touch[mynode] = true;
395  }
396  // Now look in the remote parts of weights.
398  // Get the range of remote vnodes that touch domain.
399  typename FieldLayout<Dim>::touch_range_dv range =
400  weights.getLayout().touch_range_rdv( domain );
401  // Record the processors who have touches
402  for (rf_p = range.first; rf_p != range.second ; ++rf_p) {
403  int owner = (*((*rf_p).second)).getNode();
404  found_touch[owner] = true;
405  }
406  // now just count up the number of messages to receive
407  for (i=0; i<nodes; ++i)
408  if (found_touch[i]) expected++;
409  delete [] found_touch;
410 
411  // Receive messages until we're done.
412  while ( --expected >= 0 )
413  {
414  // Receive a message.
415  int any_node = COMM_ANY_NODE;
416  Message *mess = Ippl::Comm->receive_block(any_node,reduce_tag);
417  PAssert(mess);
418  // Loop over all the domains in this message.
419  int received_domains = 0;
420  mess->get(received_domains);
421  while ( --received_domains>=0 )
422  {
423  // Get the domain for the next part.
424  Index rdomain;
425  getMessage( *mess, rdomain );
426  /*out << "ReceiveReduce, rdomain=" << rdomain << endl;*/
427  // Get the incoming reduced data.
428  int rfirst = rdomain.first() - offset;
429  mess->get(subReduced[rfirst]);
430  // Accumulate it with the rest.
431  int rlast = rdomain.last() - offset;
432  for (int i=rfirst; i<=rlast; ++i)
433  reduced[i] += subReduced[i];
434  }
435  // Delete the message, we're done with it
436  delete mess;
437  }
438 
439  // Get the median.
440  cutLoc =
441  FindMedian(nprocs,reduced.begin(),reduced.begin()+length,double())
442  -reduced.begin() + domain[cutAxis].first();
443  /*out << "ReceiveReduce, cutLoc=" << cutLoc << endl;*/
444 }
445 
447 
448 //
449 // Given the location and axis of the cut,
450 // Broadcast to everybody.
451 //
452 
453 inline void
454 BcastCuts(int cutLoc, int cutAxis, int bcast_tag)
455 {
456 
457 
458  // Make a message.
459  Message *mess = new Message();
460  // Add the data to it.
461  mess->put(cutLoc);
462  mess->put(cutAxis);
463  // Send it out.
464  Ippl::Comm->broadcast_all(mess,bcast_tag);
465 }
466 
468 
469 //
470 // Receive the broadcast cuts.
471 // Cut up each of the domains using the cuts.
472 //
473 
474 template<unsigned Dim>
475 static void
476 ReceiveCuts(std::vector< NDIndex<Dim> > &domains,
477  std::vector< int >& nprocs,
478  int bcast_tag)
479 {
480 
481 
482 
483  // Make a container to hold the split domains.
484  int nDomains = domains.size();
485  std::vector< NDIndex<Dim> > cutDomains(nDomains*2);
486  std::vector<int> cutProcs(std::vector<int>::size_type(nDomains*2));
487 
488  // Everybody receives the broadcasts.
489  // There will be one for each domain in the list.
490  for (int expected = 0; expected < nDomains; ++expected)
491  {
492  // Receive each broadcast.
493  // The processor number will correspond to the location
494  // in the domains vector.
495  int whichDomain = COMM_ANY_NODE;
496  int cutLocation = 0, cutAxis = 0;
497  Message *mess = Ippl::Comm->receive_block(whichDomain,bcast_tag);
498  PAssert(mess);
499  mess->get(cutLocation);
500  mess->get(cutAxis);
501  delete mess;
502 
503  // Split this domain.
504  const NDIndex<Dim>& domain = domains[whichDomain];
505  NDIndex<Dim>& left = cutDomains[ whichDomain*2 ];
506  NDIndex<Dim>& right = cutDomains[ whichDomain*2+1 ];
507  // Build the left and right domains.
508  left = domain ;
509  right = domain ;
510  /*out << "Build indexes from : "
511  << domain[cutAxis].first() << " "
512  << cutLocation<< " "
513  << domain[cutAxis].last()<< " "
514  << endl;*/
515  left[ cutAxis ] = Index( domain[cutAxis].first(), cutLocation-1 );
516  right[ cutAxis ] = Index( cutLocation, domain[cutAxis].last() );
517 
518  int procs = nprocs[whichDomain];
519  cutProcs[ whichDomain*2 ] = procs/2;
520  cutProcs[ whichDomain*2+1 ] = procs - procs/2;
521  }
522 
523  // Put the domains you've just built into the input containers.
524  // Strip out the domains with no processors assigned.
525  domains.clear();
526  nprocs.clear();
527  PAssert_EQ(cutProcs.size(), cutDomains.size());
528  for (unsigned int i=0; i<cutProcs.size(); ++i)
529  {
530  if ( cutProcs[i] != 0 )
531  {
532  domains.push_back(cutDomains[i]);
533  nprocs.push_back(cutProcs[i]);
534  }
535  else
536  {
537  PAssert_EQ(cutDomains[i].size(), 0);
538  }
539  }
540 }
541 
543 
544 //
545 // Sweep through a list of domains, splitting each one
546 // according to the weights in a BareField.
547 //
548 
549 template<unsigned Dim>
550 static void
551 CutEach(std::vector< NDIndex<Dim> >& domains,
552  std::vector< int >& nprocs,
553  BareField<double,Dim>& weights)
554 {
555 
556  // Get tags for the reduction and the broadcast.
557  int reduce_tag = Ippl::Comm->next_tag( F_REDUCE_PERP_TAG , F_TAG_CYCLE );
558  int bcast_tag = Ippl::Comm->next_tag( F_REDUCE_PERP_TAG , F_TAG_CYCLE );
559  /*out << "reduce_tag=" << reduce_tag << endl;*/
560  /*out << "bcast_tag=" << bcast_tag << endl;*/
561 
562  // Do the sends for the reduces.
563  SendReduce(domains.begin(),domains.end(),weights,reduce_tag);
564 
565  // On the appropriate processors, receive the data for the reduce,
566  // and broadcast the cuts.
567  unsigned int mynode = Ippl::Comm->myNode();
568  if ( mynode < domains.size() )
569  {
570  // Receive partially reduced data, finish the reduction, find the median.
571  int cutAxis, cutLoc;
572  ReceiveReduce(domains[mynode],weights,reduce_tag,
573  nprocs[mynode],cutLoc,cutAxis);
574  // Broadcast those cuts out to everybody.
575  BcastCuts(cutLoc,cutAxis,bcast_tag);
576  }
577 
578  // Receive the broadcast cuts and slice up the domains.
579  ReceiveCuts(domains,nprocs,bcast_tag);
580 }
581 
583 
584 template<unsigned Dim>
587 {
588 // Build a list of domains as we go.
589  std::vector< NDIndex<Dim> > domains; // used by TAU_TYPE_STRING
590  std::vector<int> procs;
591 
592  /*out << "Starting CalcBinaryRepartition, outstanding msgs="
593  << Ippl::Comm->getReceived()
594  << endl;*/
595 
596  // Get the processors we'll be dealing with.
597  int nprocs = Ippl::Comm->getNodes();
598  int myproc = Ippl::Comm->myNode();
599  domains.reserve(nprocs);
600  procs.reserve(nprocs);
601  // Start the list with just the top level domain.
602  domains.push_back( layout.getDomain() );
603  procs.push_back( nprocs );
604 
605  // mprocs is the max number of procs assigned to a domain.
606  int mprocs=nprocs;
607 
608  // Loop as long as some domain has more than one proc assigned to it.
609  while ( mprocs>1 )
610  {
611  // Cut all the domains in half.
612  CutEach(domains,procs,weights);
613 
614  // Find the max number of procs assigned to a domain.
615  mprocs = 0;
616  for (unsigned int i=0; i<procs.size(); ++i)
617  if (mprocs<procs[i]) mprocs = procs[i];
618  }
619  // Return the domain on this processor.
620 
621 
622  //seriously dirty fix
623  typename std::vector< NDIndex<Dim> >::iterator i;
624 
625  bool degenerated = false;
626 
627  for(i = domains.begin();i!=domains.end();++i)
628  {
629  for(unsigned int d = 0;d<Dim;++d)
630  if((*i)[d].first() == (*i)[d].last())
631  {
632  degenerated = true;
633  break;
634  }
635  if(degenerated)
636  break;
637  }
638 
639  if(!degenerated)
640  return domains.begin()[myproc];
641  else
642  {
643  throw BinaryRepartitionFailed();
644  }
645 
646 }
647 
649 
650 
651 /***************************************************************************
652  * $RCSfile: BinaryBalancer.cpp,v $ $Author: adelmann $
653  * $Revision: 1.1.1.1 $ $Date: 2003/01/23 07:40:27 $
654  * IPPL_VERSION_ID: $Id: BinaryBalancer.cpp,v 1.1.1.1 2003/01/23 07:40:27 adelmann Exp $
655  ***************************************************************************/
PartBunchBase< T, Dim >::ConstIterator end(PartBunchBase< T, Dim > const &bunch)
PartBunchBase< T, Dim >::ConstIterator begin(PartBunchBase< T, Dim > const &bunch)
const unsigned Dim
NDIndex< Dim > CalcBinaryRepartition(FieldLayout< Dim > &layout, BareField< double, Dim > &weights)
void BcastCuts(int cutLoc, int cutAxis, int bcast_tag)
@ SERIAL
Definition: FieldLayout.h:55
const int COMM_ANY_NODE
Definition: Communicate.h:40
void putMessage(Message &m, const T &t)
Definition: Message.h:549
void getMessage(Message &m, T &t)
Definition: Message.h:572
#define F_REDUCE_PERP_TAG
Definition: Tags.h:49
#define F_TAG_CYCLE
Definition: Tags.h:53
#define PAssert_EQ(a, b)
Definition: PAssert.h:104
#define PAssert(c)
Definition: PAssert.h:102
std::string::iterator iterator
Definition: MSLang.h:16
NDIndex< Dim > intersect(const NDIndex< Dim > &) const
iterator_if begin_if()
Definition: BareField.h:100
ac_id_larray::iterator iterator_if
Definition: BareField.h:92
iterator_if end_if()
Definition: BareField.h:101
Layout_t & getLayout() const
Definition: BareField.h:131
touch_range_dv touch_range_rdv(const NDIndex< Dim > &domain, const GuardCellSizes< Dim > &gc=gc0()) const
Definition: FieldLayout.h:780
const NDIndex< Dim > & getDomain() const
Definition: FieldLayout.h:325
std::pair< touch_iterator_dv, touch_iterator_dv > touch_range_dv
Definition: FieldLayout.h:77
e_dim_tag getRequestedDistribution(unsigned int d) const
Definition: FieldLayout.h:405
e_dim_tag getDistribution(unsigned int d) const
Definition: FieldLayout.h:396
int size(unsigned d) const
Definition: BrickIterator.h:43
T & offset(int i) const
Definition: Index.h:237
int last() const
Definition: IndexInlines.h:136
int first() const
Definition: IndexInlines.h:116
bool send(Message *, int node, int tag, bool delmsg=true)
Message * receive_block(int &node, int &tag)
int getNodes() const
Definition: Communicate.h:143
virtual int broadcast_all(Message *, int)
int myNode() const
Definition: Communicate.h:155
Message & setDelete(const bool c)
Definition: Message.h:331
Message & put(const T &val)
Definition: Message.h:406
Message & setCopy(const bool c)
Definition: Message.h:319
Message & get(const T &cval)
Definition: Message.h:476
int next_tag(int t, int s=1000)
Definition: TagMaker.h:39
static int getNodes()
Definition: IpplInfo.cpp:670
static int myNode()
Definition: IpplInfo.cpp:691
static Communicate * Comm
Definition: IpplInfo.h:84