src/Field/BareField.cpp

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  * This program was prepared by PSI. 
00007  * All rights in the program are reserved by PSI.
00008  * Neither PSI nor the author(s)
00009  * makes any warranty, express or implied, or assumes any liability or
00010  * responsibility for the use of this software
00011  *
00012  * Visit http://www.acl.lanl.gov/POOMS for more details
00013  *
00014  ***************************************************************************/
00015 
00016 // -*- C++ -*-
00017 /***************************************************************************
00018  *
00019  * The IPPL Framework
00020  * 
00021  *
00022  * Visit http://people.web.psi.ch/adelmann/ for more details
00023  *
00024  ***************************************************************************/
00025 
00026 // include files
00027 #include "Field/BareField.h"
00028 #include "Field/BrickExpression.h"
00029 #include "FieldLayout/FieldLayout.h"
00030 #include "Message/Communicate.h"
00031 #include "Message/GlobalComm.h"
00032 #include "Message/Tags.h"
00033 #include "Utility/Inform.h"
00034 #include "Utility/Unique.h"
00035 #include "Utility/IpplInfo.h"
00036 #include "Utility/IpplStats.h"
00037 #include "Profile/Profiler.h"
00038 
00039 #ifdef IPPL_STDSTL
00040 #include <map>
00041 #include <utility>
00042 using namespace std;
00043 #else
00044 #include <map.h>
00045 #include <multimap.h>
00046 #endif // IPPL_STDSTL
00047 
00048 #include <stdlib.h>
00049 
00050 #ifdef IPPL_NETCDF
00051 #include <netcdf.h>
00052 #endif 
00053 
00055 //
00056 // Copy ctor.
00057 //
00058 
00059 template< class T, unsigned Dim >
00060 BareField<T,Dim>::BareField(const BareField<T,Dim>& a)
00061 : Layout( a.Layout ),           // Copy the layout.
00062   Gc( a.Gc ),                   // Copy the number of guard cells.
00063   compressible_m( a.compressible_m )
00064 {
00065   TAU_TYPE_STRING(taustr, "void (" + CT(a) + " )" );
00066   TAU_PROFILE("BareField::BareField()", taustr, TAU_FIELD);
00067 
00068   // We assume the guard cells are peachy so clear the dirty flag.
00069   clearDirtyFlag();
00070 
00071   // Reserve space for the pointers to the LFields.
00072   Locals_ac.reserve( a.size_if() );
00073   // Loop over the LFields of the other array, creating LFields as we go.
00074   for ( const_iterator_if a_i = a.begin_if(); a_i != a.end_if(); ++a_i )
00075     {
00076       // Create an LField with copy ctor.
00077       LField<T,Dim> *lf = new LField<T,Dim>( *((*a_i).second) );
00078       // Insert in the local list, hinting that it should go at the end.
00079 #ifdef IPPL_SGI_CC_721_TYPENAME_BUG
00080       Locals_ac.insert( Locals_ac.end(),
00081                         ac_id_larray::value_type((*a_i).first,lf) );
00082 #else
00083       Locals_ac.insert( Locals_ac.end(),
00084                         typename ac_id_larray::value_type((*a_i).first,lf) );
00085 #endif // IPPL_SGI_CC_721_TYPENAME_BUG
00086     }
00087   // Tell the layout we are here.
00088   getLayout().checkin( *this , Gc );
00089   //INCIPPLSTAT(incBareFields);
00090 }
00091 
00092 
00094 //
00095 // Destructor
00096 //
00097 
00098 template< class T, unsigned Dim >
00099 BareField<T,Dim>::~BareField() {
00100   TAU_TYPE_STRING(taustr, CT(*this) + " void ()" );
00101   TAU_PROFILE("BareField::~BareField()", taustr, TAU_FIELD);
00102   // must check out from our layout
00103   if (Layout != 0) {
00104     Layout->checkout(*this);
00105   }
00106 }
00107 
00108 
00110 // Initialize the field, if it was constructed from the default constructor.
00111 // This should NOT be called if the field was constructed by providing
00112 // a FieldLayout.
00113 
00114 template< class T, unsigned Dim >
00115 void
00116 BareField<T,Dim>::initialize(Layout_t & l) {
00117   TAU_TYPE_STRING(taustr, "void (" + CT(l) + " )" );
00118   TAU_PROFILE("BareField::initialize()", taustr, TAU_FIELD);
00119 
00120   // if our Layout has been previously set, we just ignore this request
00121   if (Layout == 0) {
00122     Layout = &l;
00123     setup();
00124   }
00125 }
00126 
00127 template< class T, unsigned Dim >
00128 void
00129 BareField<T,Dim>::initialize(Layout_t & l,
00130                              const GuardCellSizes<Dim>& gc) {
00131   TAU_TYPE_STRING(taustr, "void (" + CT(l) + ", " + CT(gc) + " )" );
00132   TAU_PROFILE("BareField::initialize()", taustr, TAU_FIELD);
00133 
00134   // if our Layout has been previously set, we just ignore this request
00135   if (Layout == 0) {
00136     Layout = &l;
00137     Gc = gc;
00138     setup();
00139   }
00140 }
00141 
00142 
00144 //
00145 // Using the data that has been initialized by the ctors,
00146 // complete the construction by allocating the LFields.
00147 //
00148 
00149 template< class T, unsigned Dim >
00150 void
00151 BareField<T,Dim>::setup()
00152 {
00153   TAU_TYPE_STRING(taustr, CT(*this) + " void ()" );
00154   TAU_PROFILE("BareField::setup()", taustr, TAU_FIELD | TAU_ASSIGN);
00155 
00156   // Make sure this FieldLayout can handle the number of GuardCells
00157   // which we have here
00158   if ( ! getLayout().fitsGuardCells(Gc)) {
00159     ERRORMSG("Cannot construct IPPL BareField:" << endl);
00160     ERRORMSG("  One or more vnodes is too small for the guard cells." << endl);
00161     ERRORMSG("  Guard cell sizes = " << Gc << endl);
00162     ERRORMSG("  FieldLayout = " << getLayout() << endl);
00163     Ippl::abort();
00164   }
00165 
00166   // We assume the guard cells are peachy so clear the dirty flag.
00167   clearDirtyFlag();
00168 
00169   // Reserve space for the pointers to the LFields.
00170   Locals_ac.reserve( getLayout().size_iv() );
00171 
00172   // Loop over all the Vnodes, creating an LField in each.
00173   for (typename Layout_t::iterator_iv v_i=getLayout().begin_iv();
00174        v_i != getLayout().end_iv();
00175        ++v_i)
00176     {
00177       // Get the owned and guarded sizes.
00178       const NDIndex<Dim> &owned = (*v_i).second->getDomain();
00179       NDIndex<Dim> guarded = AddGuardCells( owned , Gc );
00180 
00181       // Get the global vnode number (ID number, value from 0 to nvnodes-1):
00182       int vnode = (*v_i).second->getVnode();
00183 
00184       // Construct the LField for this Vnode.
00185       LField<T,Dim> *lf = new LField<T,Dim>( owned, guarded, vnode );
00186 
00187       // Put it in the list.
00188 #ifdef __MWERKS__
00189       Locals_ac.insert( end_if(), 
00190                         ac_id_larray::value_type((*v_i).first,lf));
00191 #else
00192 #ifdef IPPL_SGI_CC_721_TYPENAME_BUG
00193       Locals_ac.insert( end_if(), 
00194                         ac_id_larray::value_type((*v_i).first,lf));
00195 #else
00196       Locals_ac.insert( end_if(), 
00197                         typename ac_id_larray::value_type((*v_i).first,lf));
00198 #endif // IPPL_SGI_CC_721_TYPENAME_BUG
00199 #endif // __MWERKS__
00200     }
00201   // Tell the layout we are here.
00202   getLayout().checkin( *this , Gc );
00203   //INCIPPLSTAT(incBareFields);
00204 }
00205 
00207 
00208 //
00209 // Print a BareField out.
00210 //
00211 
00212 template< class T, unsigned Dim>
00213 void 
00214 BareField<T,Dim>::write(ostream& out)
00215 {
00216   TAU_TYPE_STRING(taustr, CT(*this) + " void (ostream )" );
00217   TAU_PROFILE("BareField::write()", taustr, TAU_FIELD | TAU_IO);
00218 
00219   // Inform dbgmsg(">>>>>>>> BareField::write", INFORM_ALL_NODES);
00220   // dbgmsg << "Printing values for field at address = " << &(*this) << endl;
00221 
00222   // on remote nodes, we must send the subnodes LField's to node 0
00223   int tag = Ippl::Comm->next_tag(F_WRITE_TAG, F_TAG_CYCLE);
00224   if (Ippl::myNode() != 0) {
00225     for ( iterator_if local = begin_if(); local != end_if() ; ++local) {
00226       // Cache some information about this local field.
00227       LField<T,Dim>&  l = *((*local).second);
00228       NDIndex<Dim>&  lo = (NDIndex<Dim>&) l.getOwned();
00229       typename LField<T,Dim>::iterator rhs(l.begin());
00230 
00231       // Build and send a message containing the owned LocalField data
00232       if (Ippl::myNode() != 0) {
00233         Message *mess = new Message();
00234         lo.putMessage(*mess);         // send the local domain of the LField
00235         rhs.putMessage(*mess);          // send the data itself
00236         // dbgmsg << "Sending domain " << lo << " to node 0" << endl;
00237         Ippl::Comm->send(mess, 0, tag);
00238       }
00239     }
00240   } else {    // now, on node 0, receive the remaining LField's ...
00241     // put all the LField's in a big, uncompressed LField
00242     LField<T,Dim> data(getDomain(), getDomain());
00243     data.Uncompress();
00244 
00245     // first put in our local ones
00246     for ( iterator_if local = begin_if(); local != end_if() ; ++local) {
00247       // Cache some information about this local field.
00248       LField<T,Dim>&  l = *((*local).second);
00249       NDIndex<Dim>&  lo = (NDIndex<Dim>&) l.getOwned();
00250       typename LField<T,Dim>::iterator rhs(l.begin());
00251       
00252       // put the local LField in our big LField
00253       // dbgmsg << "  Copying local domain " << lo << " from Lfield at ";
00254       // dbgmsg << &l << ":" << endl;
00255       typename LField<T,Dim>::iterator putloc = data.begin(lo);
00256       typename LField<T,Dim>::iterator getloc = l.begin(lo);
00257       for ( ; getloc != l.end() ; ++putloc, ++getloc ) {
00258         // dbgmsg << "    from " << &(*getloc) << " to " << &(*putloc);
00259         // dbgmsg << ": " << *putloc << " = " << *getloc << endl;
00260         *putloc = *getloc;
00261       }
00262     }
00263 
00264     // we expect to receive one message from each remote vnode
00265     int remaining = getLayout().size_rdv();
00266 
00267     // keep receiving messages until they're all here
00268     for ( ; remaining > 0; --remaining) {
00269       // Receive the generic message.
00270       int any_node = COMM_ANY_NODE;
00271       Message *mess = Ippl::Comm->receive_block(any_node, tag);
00272 
00273       // Extract the domain size and LField iterator from the message
00274       NDIndex<Dim> lo;
00275       T compressed_data;
00276       typename LField<T,Dim>::iterator rhs(compressed_data);
00277       lo.getMessage(*mess);
00278       rhs.getMessage(*mess);
00279       // dbgmsg << "Received domain " << lo << " from " << any_node << endl;
00280 
00281       // put the received LField in our big LField
00282       typename LField<T,Dim>::iterator putloc = data.begin(lo);
00283       for (unsigned elems=lo.size(); elems > 0; ++putloc, ++rhs, --elems)
00284         *putloc = *rhs;
00285 
00286       // Free the memory in the message.
00287       delete mess;
00288     }
00289 
00290     // finally, we can print the big LField out
00291     out << data;
00292   }
00293 }
00294 
00295 
00297 
00298 //
00299 // Fill all the guard cells, including all the necessary communication.
00300 //
00301 
00302 template< class T, unsigned Dim >
00303 void BareField<T,Dim>::fillGuardCells(bool reallyFill) const
00304 {
00305   // profiling macros
00306   TAU_TYPE_STRING(taustr, CT(*this) + " void ()" );
00307   TAU_PROFILE("BareField::fillGuardCells()", taustr, TAU_FIELD);
00308 
00309   TAU_PROFILE_TIMER(sendtimer, "  fillGuardCells-send", 
00310                     taustr, TAU_FIELD);
00311   TAU_PROFILE_TIMER(sendcommtimer, "   fillGuardCells-send-comm",
00312                     taustr, TAU_FIELD);
00313   TAU_PROFILE_TIMER(findrectimer, "  fillGuardCells-findreceive",
00314                     taustr, TAU_FIELD);
00315   TAU_PROFILE_TIMER(localstimer, "  fillGuardCells-locals",
00316                     taustr, TAU_FIELD);
00317   TAU_PROFILE_TIMER(rectimer, "  fillGuardCells-receive",
00318                     taustr, TAU_FIELD);
00319   TAU_PROFILE_TIMER(reccommtimer, "   fillGuardCells-receive-comm",
00320                     taustr, TAU_FIELD);
00321   TAU_PROFILE_TIMER(localsexprtimer, "   fillGuardCells-locals-expression",
00322                     taustr, TAU_FIELD);
00323 
00324   // This operation is logically const because the physical cells
00325   // of the BareField are unaffected, so cast away const here.
00326   BareField<T,Dim>& ncf = const_cast<BareField<T,Dim>&>(*this);
00327 
00328   // Only fill if we are supposed to.
00329   if (!reallyFill)
00330     return;
00331 
00332   // Make ourselves clean.
00333   ncf.clearDirtyFlag();
00334 
00335   // Only need to do work if we have non-zero GuardCellSizes
00336   if (Gc == GuardCellSizes<Dim>())
00337     return;
00338 
00339   // indicate we're doing another gc fill
00340   //INCIPPLSTAT(incGuardCellFills);
00341 
00342   // iterators for looping through LField's of this BareField
00343   iterator_if lf_i, lf_e = ncf.end_if();
00344 
00345 #ifdef IPPL_PRINTDEBUG
00346   Inform msg("fillGuardCells", INFORM_ALL_NODES);
00347 #endif
00348 
00349   // ----------------------------------------
00350   // First the send loop.
00351   // Loop over all the local nodes and
00352   // send data to the remote ones they overlap.
00353   int nprocs = Ippl::getNodes();
00354   if (nprocs > 1) {
00355     TAU_PROFILE_START(findrectimer);
00356     // Build a map of the messages we expect to receive.
00357     typedef multimap< NDIndex<Dim> , LField<T,Dim>* , less<NDIndex<Dim> > > ac_recv_type;
00358     ac_recv_type recv_ac;
00359     bool* recvmsg = new bool[nprocs];
00360     TAU_PROFILE_STOP(findrectimer);
00361 
00362     TAU_PROFILE_START(sendtimer);
00363     // set up messages to be sent
00364     Message** mess = new Message*[nprocs];
00365 #ifdef IPPL_PRINTDEBUG
00366     int* ndomains = new int[nprocs];
00367 #endif
00368     int iproc;
00369     for (iproc=0; iproc<nprocs; ++iproc) {
00370       mess[iproc] = NULL;
00371       recvmsg[iproc] = false;
00372 #ifdef IPPL_PRINTDEBUG
00373       ndomains[iproc] = 0;
00374 #endif
00375     }
00376     TAU_PROFILE_STOP(sendtimer);
00377     // now do main loop over LFields, packing overlaps into proper messages
00378     for (lf_i = ncf.begin_if(); lf_i != lf_e; ++lf_i) {
00379       TAU_PROFILE_START(findrectimer);
00380       // Cache some information about this local array.
00381       LField<T,Dim> &lf = *((*lf_i).second);
00382       const NDIndex<Dim> &lf_domain = lf.getAllocated();
00383       // Find the remote ones that touch this LField's guard cells
00384       typename Layout_t::touch_range_dv
00385         rrange(ncf.getLayout().touch_range_rdv(lf_domain));
00386       typename Layout_t::touch_iterator_dv rv_i;
00387       for (rv_i = rrange.first; rv_i != rrange.second; ++rv_i) {
00388         // Save the intersection and the LField it is for.
00389         NDIndex<Dim> sub = lf_domain.intersect( (*rv_i).first );
00390         typedef typename ac_recv_type::value_type value_type;
00391         recv_ac.insert(value_type(sub,&lf));
00392         // note who will be sending this data
00393         int rnode = (*rv_i).second->getNode();
00394         recvmsg[rnode] = true;
00395       }
00396       TAU_PROFILE_STOP(findrectimer);
00397 
00398       TAU_PROFILE_START(sendtimer);
00399       const NDIndex<Dim>& lo = lf.getOwned();
00400 #ifdef IPPL_PRINTDEBUG
00401       msg << "Finding send overlap regions for domain " << lo << endl;
00402 #endif
00403       // Loop over the remote domains which have guard cells
00404       // that this local domain touches.
00405       typename Layout_t::touch_range_dv
00406         range( ncf.getLayout().touch_range_rdv(lo,ncf.getGC()) );
00407       typename Layout_t::touch_iterator_dv remote_i;
00408       for (remote_i = range.first; remote_i != range.second; ++remote_i) {
00409         // Find the intersection.
00410         NDIndex<Dim> intersection = lo.intersect( (*remote_i).first );
00411         // Find out who owns this remote domain.
00412         int rnode = (*remote_i).second->getNode();
00413 #ifdef IPPL_PRINTDEBUG
00414         msg << "  Overlap domain = " << (*remote_i).first << endl;
00415         msg << "  Inters. domain = " << intersection;
00416         msg << " --> node " << rnode << endl;
00417 #endif
00418         // Create an LField iterator for this intersection region,
00419         // and try to compress it.
00420         // storage for LField compression
00421         T compressed_value;
00422         LFI msgval = lf.begin(intersection, compressed_value);
00423         msgval.TryCompress();
00424 
00425         // Put intersection domain and field data into message
00426         if (!mess[rnode]) mess[rnode] = new Message;
00427         PAssert(mess[rnode]);
00428         mess[rnode]->put(intersection); // puts Dim items in Message
00429         mess[rnode]->put(msgval);       // puts 3 items in Message
00430 #ifdef IPPL_PRINTDEBUG
00431         ndomains[rnode]++;
00432 #endif
00433       }
00434     TAU_PROFILE_STOP(sendtimer);
00435     }
00436     TAU_PROFILE_START(findrectimer);
00437     int remaining = 0;
00438     for (iproc=0; iproc<nprocs; ++iproc)
00439       if (recvmsg[iproc]) ++remaining;
00440     delete [] recvmsg;
00441     TAU_PROFILE_STOP(findrectimer);
00442 
00443     TAU_PROFILE_START(sendtimer);
00444     // Get message tag.
00445     int tag = Ippl::Comm->next_tag( F_GUARD_CELLS_TAG , F_TAG_CYCLE );
00446     TAU_PROFILE_START(sendcommtimer);
00447     // Send all the messages.
00448     for (iproc=0; iproc<nprocs; ++iproc) {
00449       if (mess[iproc]) {
00450 #ifdef IPPL_PRINTDEBUG
00451         msg << "fillGuardCells: Sending message to node " << iproc << endl
00452             << "  number of domains  = " << ndomains[iproc] << endl
00453             << "  number of MsgItems = " << mess[iproc]->size() << endl;
00454 #endif
00455         Ippl::Comm->send(mess[iproc], iproc, tag);
00456       }
00457     }
00458     TAU_PROFILE_STOP(sendcommtimer);
00459     delete [] mess;
00460 #ifdef IPPL_PRINTDEBUG
00461     delete [] ndomains;
00462 #endif
00463     TAU_PROFILE_STOP(sendtimer);
00464 
00465     // ----------------------------------------
00466     // Handle the local fills.
00467     // Loop over all the local arrays.
00468     TAU_PROFILE_START(localstimer);
00469     for (lf_i = ncf.begin_if(); lf_i != lf_e; ++lf_i)
00470     {
00471       // Cache some information about this LField.
00472       LField<T,Dim> &lf = *(*lf_i).second;
00473       const NDIndex<Dim>& la = lf.getAllocated();
00474 
00475       // Loop over all the other LFields to establish each LField's
00476       // cache of overlaps.
00477         
00478       // This is an N*N operation, but the expectation is that this will
00479       // pay off since we will reuse this cache quite often.
00480 
00481       if (!lf.OverlapCacheInitialized())
00482       {
00483         for (iterator_if rf_i = ncf.begin_if(); rf_i != ncf.end_if(); ++rf_i)
00484           if ( rf_i != lf_i )
00485           {
00486             // Cache some info about this LField.
00487             LField<T,Dim> &rf = *(*rf_i).second;
00488             const NDIndex<Dim>& ro = rf.getOwned();
00489 
00490             // If the remote has info we want, then add it to our cache.
00491             if ( la.touches(ro) )
00492               lf.AddToOverlapCache(&rf);
00493           }
00494       }
00495 
00496       // We know at this point that the overlap cache is established.
00497       // Use it.
00498 
00499       for (typename LField<T,Dim>::OverlapIterator rf_i = lf.BeginOverlap();
00500            rf_i != lf.EndOverlap(); ++rf_i)
00501       {
00502         LField<T, Dim> &rf = *(*rf_i);
00503         const NDIndex<Dim>& ro = rf.getOwned();
00504 
00505         bool c1 = lf.IsCompressed();
00506         bool c2 = rf.IsCompressed();
00507         bool c3 = *rf.begin() == *lf.begin();
00508 
00509         // If these are compressed we might not have to do any work.
00510         if ( !( c1 && c2 && c3 ) )
00511         {
00512           TAU_PROFILE_START(localsexprtimer);
00513 
00514           // Find the intersection.
00515           NDIndex<Dim> intersection = la.intersect(ro);
00516                 
00517           // Build an iterator for the rhs.
00518           LFI rhs = rf.begin(intersection);
00519                 
00520           // Could we compress that?
00521           if ( !(c1 && rhs.CanCompress(*lf.begin())) )
00522           {
00523             // Make sure the left is not compressed.
00524             lf.Uncompress();
00525                   
00526             // Nope, we really have to copy.
00527             LFI lhs = lf.begin(intersection);
00528 #ifdef IPPL_PRINTDEBUG
00529             msg << "  Inters. domain=" << intersection << endl;
00530 #endif
00531             // And do the assignment.
00532             BrickExpression<Dim,LFI,LFI,OpAssign>(lhs,rhs).apply();
00533           }
00534               
00535           TAU_PROFILE_STOP(localsexprtimer);
00536         }
00537 #ifdef IPPL_PRINTDEBUG
00538         else {
00539           msg << "  Both sides compressed and equal ... val = ";
00540           msg << *rf.begin() << endl;
00541         }
00542 #endif
00543       }
00544     }
00545     TAU_PROFILE_STOP(localstimer);
00546 
00547     // ----------------------------------------
00548     // Receive all the messages.
00549     TAU_PROFILE_START(rectimer);
00550     while (remaining>0) {
00551       // Receive the next message.
00552       int any_node = COMM_ANY_NODE;
00553       TAU_PROFILE_START(reccommtimer);
00554       Message* rmess = Ippl::Comm->receive_block(any_node,tag);
00555       PAssert(rmess);
00556       --remaining;
00557       TAU_PROFILE_STOP(reccommtimer);
00558 
00559       // Determine the number of domains being sent
00560       int ndoms = rmess->size() / (Dim + 3);
00561 #ifdef IPPL_PRINTDEBUG
00562       msg << "fillGuardCells: Message received from node " << any_node
00563           << ", number of domains = " << ndoms << endl;
00564 #endif
00565       for (int idom=0; idom<ndoms; ++idom) {
00566         // Extract the intersection domain from the message.
00567         NDIndex<Dim> intersection;
00568         intersection.getMessage(*rmess);
00569 
00570         // Extract the rhs iterator from it.
00571         T compressed_value;
00572         LFI rhs(compressed_value);
00573         rhs.getMessage(*rmess);
00574 #ifdef IPPL_PRINTDEBUG
00575         msg << "Received remote overlap region = " << intersection << endl;
00576 #endif
00577 
00578         // Find the LField it is destined for.
00579         typename ac_recv_type::iterator hit = recv_ac.find( intersection );
00580         PAssert( hit != recv_ac.end() );
00581         // Build the lhs brick iterator.
00582         LField<T,Dim> &lf = *(*hit).second;
00583         // Check and see if we really have to do this.
00584 #ifdef IPPL_PRINTDEBUG
00585         msg << "   LHS compressed ? " << lf.IsCompressed();
00586         msg << ", LHS value = " << *lf.begin() << endl;
00587         msg << "   RHS compressed ? " << rhs.IsCompressed();
00588         msg << ", RHS value = " << *rhs << endl;
00589         msg << "   *rhs == *lf.begin() ? " << (*rhs == *lf.begin()) << endl;
00590 #endif
00591         if ( !(rhs.IsCompressed() && lf.IsCompressed() &&
00592              (*rhs == *lf.begin())) )
00593         {
00594           // Yep. gotta do it.
00595           lf.Uncompress();
00596           LFI lhs = lf.begin(intersection);
00597           // Do the assignment.
00598 #ifdef IPPL_PRINTDEBUG
00599           msg << "   Doing copy." << endl;
00600 #endif
00601           BrickExpression<Dim,LFI,LFI,OpAssign>(lhs,rhs).apply();
00602         }
00603 
00604         // Take that entry out of the receive list.
00605         recv_ac.erase( hit );
00606       }
00607       delete rmess;
00608     }
00609     TAU_PROFILE_STOP(rectimer);
00610   }
00611   else { // single-node case
00612     // ----------------------------------------
00613     // Handle the local fills.
00614     // Loop over all the local arrays.
00615     TAU_PROFILE_START(localstimer);
00616     for (lf_i = ncf.begin_if(); lf_i != lf_e; ++lf_i)
00617     {
00618       // Cache some information about this LField.
00619       LField<T,Dim> &lf = *(*lf_i).second;
00620       const NDIndex<Dim>& la = lf.getAllocated();
00621 
00622       // Loop over all the other LFields to establish each LField's
00623       // cache of overlaps.
00624         
00625       // This is an N*N operation, but the expectation is that this will
00626       // pay off since we will reuse this cache quite often.
00627 
00628       if (!lf.OverlapCacheInitialized())
00629       {
00630         for (iterator_if rf_i = ncf.begin_if(); rf_i != ncf.end_if(); ++rf_i)
00631           if ( rf_i != lf_i )
00632           {
00633             // Cache some info about this LField.
00634             LField<T,Dim> &rf = *(*rf_i).second;
00635             const NDIndex<Dim>& ro = rf.getOwned();
00636 
00637             // If the remote has info we want, then add it to our cache.
00638             if ( la.touches(ro) )
00639               lf.AddToOverlapCache(&rf);
00640           }
00641       }
00642 
00643       // We know at this point that the overlap cache is established.
00644       // Use it.
00645 
00646       for (typename LField<T,Dim>::OverlapIterator rf_i = lf.BeginOverlap();
00647            rf_i != lf.EndOverlap(); ++rf_i)
00648       {
00649         LField<T, Dim> &rf = *(*rf_i);
00650         const NDIndex<Dim>& ro = rf.getOwned();
00651 
00652         bool c1 = lf.IsCompressed();
00653         bool c2 = rf.IsCompressed();
00654         bool c3 = *rf.begin() == *lf.begin();
00655 
00656         // If these are compressed we might not have to do any work.
00657         if ( !( c1 && c2 && c3 ) )
00658         {
00659           TAU_PROFILE_START(localsexprtimer);
00660 
00661           // Find the intersection.
00662           NDIndex<Dim> intersection = la.intersect(ro);
00663                 
00664           // Build an iterator for the rhs.
00665           LFI rhs = rf.begin(intersection);
00666                 
00667           // Could we compress that?
00668           if ( !(c1 && rhs.CanCompress(*lf.begin())) )
00669           {
00670             // Make sure the left is not compressed.
00671             lf.Uncompress();
00672                   
00673             // Nope, we really have to copy.
00674             LFI lhs = lf.begin(intersection);
00675 #ifdef IPPL_PRINTDEBUG
00676             msg << "  Inters. domain=" << intersection << endl;
00677 #endif
00678             // And do the assignment.
00679             BrickExpression<Dim,LFI,LFI,OpAssign>(lhs,rhs).apply();
00680           }
00681               
00682           TAU_PROFILE_STOP(localsexprtimer);
00683         }
00684 #ifdef IPPL_PRINTDEBUG
00685         else {
00686           msg << "  Both sides compressed and equal ... val = ";
00687           msg << *rf.begin() << endl;
00688         }
00689 #endif
00690       }
00691     }
00692     TAU_PROFILE_STOP(localstimer);
00693   }
00694   return;
00695 }
00696 
00698 
00699 //
00700 // Fill guard cells with a constant value.
00701 // This is typically used to zero out the guard cells before a scatter.
00702 //
00703 
00704 template <class T, unsigned Dim>
00705 void BareField<T,Dim>::setGuardCells(const T& val) const
00706 {
00707   // profiling macros
00708   TAU_TYPE_STRING(taustr, CT(*this) + " void (" + CT(val) + ")" );
00709   TAU_PROFILE("BareField::setGuardCells()", taustr, TAU_FIELD);
00710 
00711   // if there are no guard cells, we can just return
00712   if (Gc == GuardCellSizes<Dim>())
00713     return;
00714 
00715   // this member function is logically const, so cast away const-ness
00716   BareField<T,Dim>& ncf = const_cast<BareField<T,Dim>&>(*this);
00717 
00718   // loop over our LFields and set guard cell values
00719   iterator_if lf_i, lf_e = ncf.end_if();
00720   for (lf_i = ncf.begin_if(); lf_i != lf_e; ++lf_i) {
00721     // Get this LField
00722     LField<T,Dim>& lf = *((*lf_i).second);
00723 
00724     // Quick test to see if we can avoid doing any work.
00725     // If this LField is compressed and already contains
00726     // the value that is being assigned to the guard cells,
00727     // then we can just move on to the next LField.
00728     if (lf.IsCompressed() && lf.getCompressedData() == val)
00729       continue;
00730 
00731     // OK, we really have to fill the guard cells, so get 
00732     // the domains we will be working with.
00733     const NDIndex<Dim>& adom = lf.getAllocated();
00734     const NDIndex<Dim>& odom = lf.getOwned();
00735     NDIndex<Dim> dom = odom;  // initialize working domain to Owned
00736 
00737     // create a compressed LField with the same allocated domain
00738     // containing the value to be assigned to the guard cells
00739     LField<T,Dim> rf(odom,adom);
00740     rf.Compress(val);
00741 
00742     // Uncompress LField to be filled and get some iterators
00743     lf.Uncompress();
00744     LFI lhs, rhs;
00745 
00746     // now loop over dimensions and fill guard cells along each
00747     for (int idim = 0; idim < Dim; ++idim) {
00748       // set working domain to left guards in this dimension
00749       dom[idim] = Index(adom[idim].first(),
00750                         adom[idim].first() + Gc.left(idim) - 1);
00751       if (!dom[idim].empty()) {
00752         // Set iterators over working domain and do assignment
00753         lhs = lf.begin(dom);
00754         rhs = rf.begin(dom);
00755         BrickExpression<Dim,LFI,LFI,OpAssign>(lhs,rhs).apply();
00756       }
00757 
00758       // Set working domain to right guards in this dimension
00759       dom[idim] = Index(adom[idim].last() - Gc.right(idim) + 1,
00760                         adom[idim].last());
00761       if (!dom[idim].empty()) {
00762         // Set iterators over working domain and do assignment
00763         lhs = lf.begin(dom);
00764         rhs = rf.begin(dom);
00765         BrickExpression<Dim,LFI,LFI,OpAssign>(lhs,rhs).apply();
00766       }
00767 
00768       // Adjust working domain along this direction 
00769       // in preparation for working on next dimension.
00770       dom[idim] = adom[idim];
00771     }
00772   }
00773 
00774   // let's set the dirty flag, since we have modified guard cells.
00775   ncf.setDirtyFlag();
00776 
00777   return;
00778 }
00779 
00781 
00782 //
00783 // Accumulate guard cells into real cells, including necessary communication.
00784 //
00785 
00786 template <class T, unsigned Dim>
00787 void BareField<T,Dim>::accumGuardCells()
00788 {
00789   // profiling macros
00790   TAU_TYPE_STRING(taustr, CT(*this) + " void ()" );
00791   TAU_PROFILE("BareField::accumGuardCells()", taustr, TAU_FIELD);
00792 
00793   TAU_PROFILE_TIMER(sendtimer, "  accumGuardCells-send", 
00794                     taustr, TAU_FIELD);
00795   TAU_PROFILE_TIMER(sendcommtimer, "   accumGuardCells-send-comm",
00796                     taustr, TAU_FIELD);
00797   TAU_PROFILE_TIMER(findrectimer, "  accumGuardCells-findreceive",
00798                     taustr, TAU_FIELD);
00799   TAU_PROFILE_TIMER(localstimer, "  accumGuardCells-locals",
00800                     taustr, TAU_FIELD);
00801   TAU_PROFILE_TIMER(rectimer, "  accumGuardCells-receive",
00802                     taustr, TAU_FIELD);
00803   TAU_PROFILE_TIMER(reccommtimer, "   accumGuardCells-receive-comm",
00804                     taustr, TAU_FIELD);
00805   TAU_PROFILE_TIMER(localsexprtimer, "   accumGuardCells-locals-expression",
00806                     taustr, TAU_FIELD);
00807 
00808   // Only need to do work if we have non-zero GuardCellSizes
00809   if (Gc == GuardCellSizes<Dim>())
00810     return;
00811 
00812   // iterators for looping through LField's of this BareField
00813   iterator_if lf_i, lf_e = end_if();
00814 
00815 #ifdef IPPL_PRINTDEBUG
00816   Inform msg("accumGuardCells", INFORM_ALL_NODES);
00817 #endif
00818 
00819   // ----------------------------------------
00820   // First the send loop.
00821   // Loop over all the local nodes and
00822   // send data to the remote ones they overlap.
00823   int nprocs = Ippl::getNodes();
00824   if (nprocs > 1) {
00825     TAU_PROFILE_START(findrectimer);
00826     // Build a map of the messages we expect to receive.
00827     typedef multimap< NDIndex<Dim> , LField<T,Dim>* , less<NDIndex<Dim> > > ac_recv_type;
00828     ac_recv_type recv_ac;
00829     bool* recvmsg = new bool[nprocs];
00830     TAU_PROFILE_STOP(findrectimer);
00831 
00832     TAU_PROFILE_START(sendtimer);
00833     // set up messages to be sent
00834     Message** mess = new Message*[nprocs];
00835 #ifdef IPPL_PRINTDEBUG
00836     int* ndomains = new int[nprocs];
00837 #endif
00838     int iproc;
00839     for (iproc=0; iproc<nprocs; ++iproc) {
00840       mess[iproc] = NULL;
00841       recvmsg[iproc] = false;
00842 #ifdef IPPL_PRINTDEBUG
00843       ndomains[iproc] = 0;
00844 #endif
00845     }
00846     TAU_PROFILE_STOP(sendtimer);
00847     // now do main loop over LFields, packing overlaps into proper messages
00848     for (lf_i = begin_if(); lf_i != lf_e; ++lf_i) {
00849       TAU_PROFILE_START(findrectimer);
00850       // Cache some information about this local array.
00851       LField<T,Dim> &lf = *((*lf_i).second);
00852       const NDIndex<Dim>& lo = lf.getOwned();
00853       // Find the remote ones with guard cells touching this LField
00854       typename Layout_t::touch_range_dv
00855         rrange( getLayout().touch_range_rdv(lo,Gc) );
00856       typename Layout_t::touch_iterator_dv rv_i;
00857       for (rv_i = rrange.first; rv_i != rrange.second; ++rv_i) {
00858         // Save the intersection and the LField it is for.
00859         NDIndex<Dim> sub = lo.intersect( (*rv_i).first );
00860         recv_ac.insert(ac_recv_type::value_type(sub,&lf));
00861         // note who will be sending this data
00862         int rnode = (*rv_i).second->getNode();
00863         recvmsg[rnode] = true;
00864       }
00865       TAU_PROFILE_STOP(findrectimer);
00866 
00867       TAU_PROFILE_START(sendtimer);
00868       const NDIndex<Dim> &lf_domain = lf.getAllocated();
00869 #ifdef IPPL_PRINTDEBUG
00870       msg << "Finding send overlap regions for domain " << lf_domain << endl;
00871 #endif
00872       // Loop over the remote domains that touch this local
00873       // domain's guard cells
00874       typename Layout_t::touch_range_dv
00875         range(getLayout().touch_range_rdv(lf_domain));
00876       typename Layout_t::touch_iterator_dv remote_i;
00877       for (remote_i = range.first; remote_i != range.second; ++remote_i) {
00878         // Find the intersection.
00879         NDIndex<Dim> intersection = lf_domain.intersect( (*remote_i).first );
00880         // Find out who owns this remote domain.
00881         int rnode = (*remote_i).second->getNode();
00882 #ifdef IPPL_PRINTDEBUG
00883         msg << "  Overlap domain = " << (*remote_i).first << endl;
00884         msg << "  Inters. domain = " << intersection;
00885         msg << " --> node " << rnode << endl;
00886 #endif
00887         // Create an LField iterator for this intersection region,
00888         // and try to compress it.
00889         // storage for LField compression
00890         T compressed_value;
00891         LFI msgval = lf.begin(intersection, compressed_value);
00892         msgval.TryCompress();
00893 
00894         // Put intersection domain and field data into message
00895         if (!mess[rnode]) mess[rnode] = new Message;
00896         PAssert(mess[rnode]);
00897         mess[rnode]->put(intersection); // puts Dim items in Message
00898         mess[rnode]->put(msgval);       // puts 3 items in Message
00899 #ifdef IPPL_PRINTDEBUG
00900         ndomains[rnode]++;
00901 #endif
00902       }
00903     TAU_PROFILE_STOP(sendtimer);
00904     }
00905     TAU_PROFILE_START(findrectimer);
00906     int remaining = 0;
00907     for (iproc=0; iproc<nprocs; ++iproc)
00908       if (recvmsg[iproc]) ++remaining;
00909     delete [] recvmsg;
00910     TAU_PROFILE_STOP(findrectimer);
00911 
00912     TAU_PROFILE_START(sendtimer);
00913     // Get message tag.
00914     int tag = Ippl::Comm->next_tag( F_GUARD_CELLS_TAG , F_TAG_CYCLE );
00915     TAU_PROFILE_START(sendcommtimer);
00916     // Send all the messages.
00917     for (iproc=0; iproc<nprocs; ++iproc) {
00918       if (mess[iproc]) {
00919 #ifdef IPPL_PRINTDEBUG
00920         msg << "accumGuardCells: Sending message to node " << iproc << endl
00921             << "  number of domains  = " << ndomains[iproc] << endl
00922             << "  number of MsgItems = " << mess[iproc]->size() << endl;
00923 #endif
00924         Ippl::Comm->send(mess[iproc], iproc, tag);
00925       }
00926     }
00927     TAU_PROFILE_STOP(sendcommtimer);
00928     delete [] mess;
00929 #ifdef IPPL_PRINTDEBUG
00930     delete [] ndomains;
00931 #endif
00932     TAU_PROFILE_STOP(sendtimer);
00933 
00934     // ----------------------------------------
00935     // Handle the local fills.
00936     // Loop over all the local arrays.
00937     TAU_PROFILE_START(localstimer);
00938     for (lf_i=begin_if(); lf_i != lf_e; ++lf_i)
00939     {
00940       // Cache some information about this LField.
00941       LField<T,Dim> &lf = *(*lf_i).second;
00942       const NDIndex<Dim>& la = lf.getAllocated();
00943 
00944       // Loop over all the other LFields to establish each LField's
00945       // cache of overlaps.
00946         
00947       // This is an N*N operation, but the expectation is that this will
00948       // pay off since we will reuse this cache quite often.
00949 
00950       if (!lf.OverlapCacheInitialized())
00951       {
00952         for (iterator_if rf_i = begin_if(); rf_i != end_if(); ++rf_i)
00953           if ( rf_i != lf_i )
00954           {
00955             // Cache some info about this LField.
00956             LField<T,Dim> &rf = *(*rf_i).second;
00957             const NDIndex<Dim>& ro = rf.getOwned();
00958 
00959             // If the remote has info we want, then add it to our cache.
00960             if ( la.touches(ro) )
00961               lf.AddToOverlapCache(&rf);
00962           }
00963       }
00964 
00965       // We know at this point that the overlap cache is established.
00966       // Use it.
00967 
00968       for (typename LField<T,Dim>::OverlapIterator rf_i = lf.BeginOverlap();
00969            rf_i != lf.EndOverlap(); ++rf_i)
00970       {
00971         LField<T, Dim> &rf = *(*rf_i);
00972         const NDIndex<Dim>& ro = rf.getOwned();
00973 
00974         TAU_PROFILE_START(localsexprtimer);
00975 
00976         // Find the intersection.
00977         NDIndex<Dim> intersection = la.intersect(ro);
00978                 
00979         // Build iterator for lf guard cells
00980         LFI lhs = lf.begin(intersection);
00981 
00982         // check if we can skip accumulation
00983         if ( !lhs.CanCompress(T()) ) {
00984 
00985           // Make sure the right side is not compressed.
00986           rf.Uncompress();
00987                   
00988           // Build iterator for rf real cells
00989           LFI rhs = rf.begin(intersection);
00990 
00991 #ifdef IPPL_PRINTDEBUG
00992           msg << "  Inters. domain=" << intersection << endl;
00993 #endif
00994           // And do the accumulation
00995           BrickExpression<Dim,LFI,LFI,OpAddAssign>(rhs,lhs).apply();
00996         }    
00997         TAU_PROFILE_STOP(localsexprtimer);
00998       }
00999     }
01000     TAU_PROFILE_STOP(localstimer);
01001 
01002     // ----------------------------------------
01003     // Receive all the messages.
01004     TAU_PROFILE_START(rectimer);
01005     while (remaining>0) {
01006       // Receive the next message.
01007       int any_node = COMM_ANY_NODE;
01008       TAU_PROFILE_START(reccommtimer);
01009       Message* rmess = Ippl::Comm->receive_block(any_node,tag);
01010       PAssert(rmess);
01011       --remaining;
01012       TAU_PROFILE_STOP(reccommtimer);
01013 
01014       // Determine the number of domains being sent
01015       int ndoms = rmess->size() / (Dim + 3);
01016 #ifdef IPPL_PRINTDEBUG
01017       msg << "accumGuardCells: Message received from node " << any_node
01018           << ", number of domains = " << ndoms << endl;
01019 #endif
01020       for (int idom=0; idom<ndoms; ++idom) {
01021         // Extract the intersection domain from the message.
01022         NDIndex<Dim> intersection;
01023         intersection.getMessage(*rmess);
01024 
01025         // Extract the rhs iterator from it.
01026         T compressed_value;
01027         LFI rhs(compressed_value);
01028         rhs.getMessage(*rmess);
01029 #ifdef IPPL_PRINTDEBUG
01030         msg << "Received remote overlap region = " << intersection << endl;
01031 #endif
01032 
01033         // Find the LField it is destined for.
01034         typename ac_recv_type::iterator hit = recv_ac.find( intersection );
01035         PAssert( hit != recv_ac.end() );
01036 
01037         // Build the lhs brick iterator.
01038         LField<T,Dim> &lf = *(*hit).second;
01039 
01040         // Make sure LField is uncompressed
01041         lf.Uncompress();
01042         LFI lhs = lf.begin(intersection);
01043 
01044         // Do the accumulation
01045         BrickExpression<Dim,LFI,LFI,OpAddAssign>(lhs,rhs).apply();
01046 
01047         // Take that entry out of the receive list.
01048         recv_ac.erase( hit );
01049       }
01050       delete rmess;
01051     }
01052     TAU_PROFILE_STOP(rectimer);
01053   }
01054   else { // single-node case
01055     // ----------------------------------------
01056     // Handle the local fills.
01057     // Loop over all the local arrays.
01058     TAU_PROFILE_START(localstimer);
01059     for (lf_i=begin_if(); lf_i != lf_e; ++lf_i)
01060     {
01061       // Cache some information about this LField.
01062       LField<T,Dim> &lf = *(*lf_i).second;
01063       const NDIndex<Dim>& la = lf.getAllocated();
01064 
01065       // Loop over all the other LFields to establish each LField's
01066       // cache of overlaps.
01067         
01068       // This is an N*N operation, but the expectation is that this will
01069       // pay off since we will reuse this cache quite often.
01070 
01071       if (!lf.OverlapCacheInitialized())
01072       {
01073         for (iterator_if rf_i = begin_if(); rf_i != end_if(); ++rf_i)
01074           if ( rf_i != lf_i )
01075           {
01076             // Cache some info about this LField.
01077             LField<T,Dim> &rf = *(*rf_i).second;
01078             const NDIndex<Dim>& ro = rf.getOwned();
01079 
01080             // If the remote has info we want, then add it to our cache.
01081             if ( la.touches(ro) )
01082               lf.AddToOverlapCache(&rf);
01083           }
01084       }
01085 
01086       // We know at this point that the overlap cache is established.
01087       // Use it.
01088 
01089       for (typename LField<T,Dim>::OverlapIterator rf_i = lf.BeginOverlap();
01090            rf_i != lf.EndOverlap(); ++rf_i)
01091       {
01092         LField<T, Dim> &rf = *(*rf_i);
01093         const NDIndex<Dim>& ro = rf.getOwned();
01094 
01095         TAU_PROFILE_START(localsexprtimer);
01096 
01097         // Find the intersection.
01098         NDIndex<Dim> intersection = la.intersect(ro);
01099                 
01100         // Build iterator for lf guard cells
01101         LFI lhs = lf.begin(intersection);
01102 
01103         // Check if we can skip accumulation
01104         if ( !lhs.CanCompress(T()) ) {
01105           // Make sure rf is not compressed.
01106           rf.Uncompress();
01107                   
01108           // Build iterator for rf real cells
01109           LFI rhs = rf.begin(intersection);
01110 
01111 #ifdef IPPL_PRINTDEBUG
01112           msg << "  Inters. domain=" << intersection << endl;
01113 #endif
01114           // And do the accumulation
01115           BrickExpression<Dim,LFI,LFI,OpAddAssign>(rhs,lhs).apply();
01116         }    
01117         TAU_PROFILE_STOP(localsexprtimer);
01118       }
01119     }
01120     TAU_PROFILE_STOP(localstimer);
01121   }
01122 
01123   // since we just modified real cell values, set dirty flag if using
01124   // deferred guard cell fills
01125   setDirtyFlag();
01126   // fill guard cells now, unless we are deferring
01127   fillGuardCellsIfNotDirty();
01128   // try to compress the final result
01129   Compress();
01130 
01131   return;
01132 }
01133 
01135 
01136 // netCDF write operation for 1,2, and 3 Dimensionsal arrays
01137 // currently, only prints out doubles (RTTI would help....)
01138 // look here for more I/O capabilities
01139 
01140 template< class T, unsigned Dim >
01141 void BareField<T,Dim>::write(char* fname) const
01142 {
01143   TAU_TYPE_STRING(taustr, CT(*this) + " void (char * )" );
01144   TAU_PROFILE("BareField::write()", taustr, TAU_FIELD | TAU_IO);
01145   
01146 #ifdef IPPL_NETCDF
01147 
01148   int ncid;                           // NetCDF file ID
01149   int varVID;                         // NetCDF variable ID's
01150 
01151   // Create the NetCDF file, overwrite if already exists:
01152   ncid = nccreate(fname, NC_CLOBBER);
01153   // Select no-fill mode, to avoid wasting time initializing values
01154   // into variables with no UNLIMITED dimension upon creation:
01155   int fillmode = ncsetfill(ncid,NC_NOFILL); //tjwdebug
01156   // Initialize metadata:
01157   // Dimensions:
01158 
01159   //tjw  const NDIndex<Dim> &domain = Layout->get_Domain();
01160   const NDIndex<Dim> &domain = getDomain();
01161 
01162   // Variables:
01163   // Order them with x-direction last, so that IPPL's FORTRAN-ordered LField's
01164   // give the C interface of NetCDF what it expects (last dimension varying 
01165   // fastest).
01166   int dimids[Dim];
01167   if ( Dim==1 )
01168     {
01169       dimids[0] = ncdimdef(ncid, "nx", domain[0].length());
01170     } 
01171   else if ( Dim==2 )
01172     {
01173       dimids[1] = ncdimdef(ncid, "nx", domain[1].length());
01174       dimids[0] = ncdimdef(ncid, "ny", domain[0].length());
01175     } 
01176   else if ( Dim==3 )
01177     {
01178       dimids[2] = ncdimdef(ncid, "nx", domain[0].length());
01179       dimids[1] = ncdimdef(ncid, "ny", domain[1].length());
01180       dimids[0] = ncdimdef(ncid, "nz", domain[2].length());
01181     } 
01182   else if ( Dim>3 )
01183     {
01184       ERRORMSG("BareField: Can't write more than 3 dimensions." << endl);
01185     }
01186   varVID = ncvardef(ncid, fname, NC_DOUBLE, Dim, dimids);
01187   // End (metadata) definition mode and close file (par_io requires):
01188   int errRet = ncendef(ncid);
01189 
01190   // Loop over all the Vnodes, creating an LField in each.
01191   long startIndex[Dim];
01192   long countIndex[Dim];
01193   int rcode;
01194 
01195   for (const_iterator_if l_i = begin_if(); l_i != end_if(); ++l_i)
01196     {
01197       // find the offset within the global space
01198       LField<T,Dim> *ldf = (*l_i).second.get();
01199       const NDIndex<Dim> &Owned = ldf->getOwned();
01200 
01201       int size = 1;
01202       for( int i = 0 ; i < Dim ; i++ )
01203         {
01204           startIndex[Dim - i - 1] = Owned[i].first();
01205           countIndex[Dim - i - 1] = Owned[i].length();
01206           size *= countIndex[Dim - i - 1];
01207         }
01208       double* buffer = new double[size]; 
01209 
01210       int icount = 0;
01211       typename LField<T,Dim>::iterator lf_bi = ldf->begin();
01212       if ( Dim==1 )
01213         {
01214           int n0 = ldf->size(0);
01215           for (int i0=0; i0<n0; ++i0)
01216             buffer[icount++] = lf_bi.offset(i0);
01217         }
01218       else if ( Dim==2 )
01219         {
01220           int n0 = ldf->size(0);
01221           int n1 = ldf->size(1);
01222           for (int i1=0; i1<n1; ++i1)
01223             for (int i0=0; i0<n0; ++i0)
01224               buffer[icount++] = lf_bi.offset(i0,i1);
01225         }
01226       else if ( Dim==3 )
01227         {
01228           int n0 = ldf->size(0);
01229           int n1 = ldf->size(1);
01230           int n2 = ldf->size(2);
01231           for (int i2=0; i2<n2; ++i2)
01232             for (int i1=0; i1<n1; ++i1)
01233               for (int i0=0; i0<n0; ++i0)
01234                 buffer[icount++] = lf_bi.offset(i0,i1,i2);
01235         }
01236       rcode = ncvarput(ncid, varVID, startIndex,countIndex, buffer);
01237       if ( rcode != 0)
01238         {
01239           ERRORMSG("BareField: ncvarput() error, rcode=" << rcode << endl);
01240         }
01241       delete [] buffer;
01242     }
01243   rcode = ncclose(ncid);   // Everybody/master closes file.
01244 #else
01245   ERRORMSG("[Bare]Field::write(\"filename\") requires that you run 'conf' "
01246            << endl << "   "
01247            << "with the NETCDF option when building libippl.a; you haven't."
01248            << endl);
01249 #endif 
01250 }
01251 
01253 
01254 
01255 //
01256 // An output function which writes out the BareField
01257 // a vnode at a time and includes the border information
01258 //
01259 
01260 template< class T, unsigned Dim >
01261 void BareField<T,Dim>::writeb(char* fname)  const
01262 {
01263   TAU_TYPE_STRING(taustr, CT(*this) + " void (char * )" );
01264   TAU_PROFILE("BareField::writeb()", taustr, TAU_FIELD | TAU_IO);
01265   Inform outC(0, fname, Inform::OVERWRITE);
01266 
01267   int icount = 0;
01268   for (const_iterator_if l_i = begin_if(); l_i != end_if(); ++l_i)
01269     {
01270       // find the offset within the global space
01271       LField<T,Dim> *ldf = (*l_i).second.get();
01272       const NDIndex<Dim> &Owned = ldf->getOwned();
01273 
01274       outC << "vnode = " << icount++ << endl;
01275       typename LField<T,Dim>::iterator lf_bi = ldf->begin();
01276       if ( Dim==1 )
01277         {
01278           int n0 = ldf->size(0);
01279           int l0 = -Gc.left(0);
01280           int r0 = n0 + Gc.right(0);
01281           for (int i0=l0; i0<r0; ++i0)
01282             {
01283               outC << " [" << i0;
01284               outC << "]=" << lf_bi.offset(i0);
01285             }
01286         }
01287       else if ( Dim==2 )
01288         {
01289           int n0 = ldf->size(0);
01290           int n1 = ldf->size(1);
01291           int l0 = -Gc.left(0);
01292           int l1 = -Gc.left(1);
01293           int r0 = n0 + Gc.right(0);
01294           int r1 = n1 + Gc.right(1);
01295           for (int i1=l1; i1<r1; ++i1)
01296             {
01297               for (int i0=l0; i0<r0; ++i0)
01298                 {
01299                   outC << " [" << i0;
01300                   outC << "][" << i1;
01301                   outC << "]=" << lf_bi.offset(i0,i1);
01302                 }
01303               outC << endl;
01304             }
01305         }
01306       else if ( Dim==3 )
01307         {
01308           int n0 = ldf->size(0);
01309           int n1 = ldf->size(1);
01310           int n2 = ldf->size(2);
01311           int l0 = -Gc.left(0);
01312           int l1 = -Gc.left(1);
01313           int l2 = -Gc.left(2);
01314           int r0 = n0 + Gc.right(0);
01315           int r1 = n1 + Gc.right(1);
01316           int r2 = n2 + Gc.right(2);
01317           for (int i2=l2; i2<r2; ++i2)
01318             {
01319               for (int i1=l1; i1<r1; ++i1)
01320                 for (int i0=l0; i0<r0; ++i0)
01321                   {
01322                     outC << " [" << i0;
01323                     outC << "][" << i1;
01324                     outC << "][" << i2;
01325                     outC << "]=" << lf_bi.offset(i0,i1,i2);
01326                   }
01327               outC << endl;
01328             }
01329         }
01330       else
01331         {
01332           ERRORMSG(" can not write for larger than three dimensions " << endl);
01333         }
01334       outC << "------------------ " << endl;
01335     }
01336 }
01337 
01339 
01340 //
01341 // Tell a BareField to compress itself.
01342 // Just loop over all of the local fields and tell them to compress.
01343 //
01344 template<class T, unsigned Dim>
01345 void BareField<T,Dim>::Compress() const
01346 {
01347   TAU_TYPE_STRING(taustr, CT(*this) + " void ()" );
01348   TAU_PROFILE("BareField::Compress()", taustr, TAU_FIELD);
01349 
01350   if (!compressible_m) return;
01351 
01352   // This operation is logically const, so cast away const here
01353   BareField<T,Dim>& ncf = const_cast<BareField<T,Dim>&>(*this);
01354   for (iterator_if lf_i = ncf.begin_if(); lf_i != ncf.end_if(); ++lf_i)
01355     (*lf_i).second->TryCompress(isDirty());
01356 }
01357 
01358 template<class T, unsigned Dim>
01359 void BareField<T,Dim>::Uncompress() const
01360 {
01361   TAU_TYPE_STRING(taustr, CT(*this) + " void ()" );
01362   TAU_PROFILE("BareField::Uncompress()", taustr, TAU_FIELD);
01363 
01364   // This operation is logically const, so cast away const here
01365   BareField<T,Dim>& ncf = const_cast<BareField<T,Dim>&>(*this);
01366   for (iterator_if lf_i = ncf.begin_if(); lf_i != ncf.end_if(); ++lf_i)
01367     (*lf_i).second->Uncompress();
01368 }
01369 
01370 //
01371 // Look at all the local fields and calculate the 
01372 // fraction of all the elements that are compressed.
01373 //
01374 template<class T, unsigned Dim>
01375 double BareField<T,Dim>::CompressedFraction() const
01376 {
01377   TAU_TYPE_STRING(taustr, CT(*this) + " double ()" );
01378   TAU_PROFILE("BareField::CompressedFraction()", taustr, TAU_FIELD);
01379 
01380   // elements[0] = total elements
01381   // elements[1] = compressed elements
01382   double elements[2] = { 0.0, 0.0};
01383   // Loop over all of the local fields.
01384   for (const_iterator_if lf_i=begin_if(); lf_i != end_if(); ++lf_i)
01385     {
01386       // Get a reference to the current local field.
01387       LField<T,Dim> &lf = *(*lf_i).second;
01388       // Get the size of this one.
01389       double s = lf.getOwned().size();
01390       // Add that up.
01391       elements[0] += s;
01392       // If it is compressed...
01393       if ( lf.IsCompressed() )
01394         // Add that amount to the compressed total.
01395         elements[1] += s;
01396     }
01397   // Make some space to put the global sum of each of the above.
01398   double reduced_elements[2] = { 0.0, 0.0};
01399   // Do the global reduction.
01400   reduce(elements,elements+2,reduced_elements,OpAddAssign());
01401   // Return the fraction.
01402   return reduced_elements[1]/reduced_elements[0];
01403 }
01404 
01405 
01407 template<class T, unsigned Dim>
01408 void BareField<T,Dim>::Repartition(UserList* userlist)
01409 {
01410   TAU_TYPE_STRING(taustr, CT(*this) + " void (UserList * )" );
01411   TAU_PROFILE("BareField::Repartition()", taustr, TAU_FIELD);
01412 
01413   // Cast to the proper type of FieldLayout.
01414   Layout_t *newLayout = (Layout_t *)( userlist );
01415 
01416   // Build a new temporary field on the new layout.
01417   BareField<T,Dim> tempField( *newLayout, Gc );
01418 
01419   // Copy our data over to the new layout.
01420   tempField = *this;
01421 
01422   // Copy back the pointers to the new local fields.
01423   Locals_ac = tempField.Locals_ac;
01424 
01425   //INCIPPLSTAT(incRepartitions);
01426 }
01427 
01428 
01430 // Tell the subclass that the FieldLayout is being deleted, so
01431 // don't use it anymore
01432 template<class T, unsigned Dim>
01433 void BareField<T,Dim>::notifyUserOfDelete(UserList* userlist)
01434 {
01435   TAU_TYPE_STRING(taustr, CT(*this) + " void (UserList * )" );
01436   TAU_PROFILE("BareField::notifyUserOfDelete()", taustr, TAU_FIELD);
01437 
01438   // just set our layout pointer to NULL; if we try to do anything more
01439   // with this object, other than delete it, a core dump is likely
01440   if (Layout != 0 && Layout->get_Id() == userlist->getUserListID()) {
01441     // the ID refers to this layout, so get rid of it.  It is possible
01442     // the ID refers to some other object, in which case we do not want
01443     // to cast away our Layout object.
01444     Layout = 0;
01445   } else {
01446     // for now, print a warning, until other types of FieldLayoutUser's
01447     // are set up to register with a FieldLayout ... but in general,
01448     // this is OK and this warning should be removed
01449     WARNMSG("BareField: notified of unknown UserList being deleted.");
01450     WARNMSG(endl);
01451   }
01452 }
01453 
01454 // a simple true-false template used to select which loop to implement
01455 // in the BareField::localElement body
01456 template<unsigned Dim, bool exists>
01457 class LFieldDimTag {
01458 #ifdef IPPL_PURIFY
01459   // Add explicit default/copy constructors and op= to avoid UMR's.
01460 public:
01461   LFieldDimTag() {}
01462   LFieldDimTag(const LFieldDimTag<Dim,exists> &) {}
01463   LFieldDimTag<Dim,exists>&
01464   operator=(const LFieldDimTag<Dim,exists> &) { return *this; }
01465 #endif
01466 };
01467 
01468 // 1, 2, 3, and N Dimensional functions
01469 template <class T>
01470 inline
01471 T* PtrOffset(T* ptr, const NDIndex<1U>& pos, const NDIndex<1U>& alloc,
01472              LFieldDimTag<1U,true>) {
01473   T* newptr = ptr + pos[0].first() - alloc[0].first();
01474   return newptr;
01475 }
01476 
01477 template <class T>
01478 inline
01479 T* PtrOffset(T* ptr, const NDIndex<2U>& pos, const NDIndex<2U>& alloc,
01480                LFieldDimTag<2U,true>) {
01481   T* newptr = ptr + pos[0].first() - alloc[0].first() +
01482          alloc[0].length() * ( pos[1].first() - alloc[1].first() );
01483   return newptr;
01484 }
01485 
01486 template <class T>
01487 inline
01488 T* PtrOffset(T* ptr, const NDIndex<3U>& pos, const NDIndex<3U>& alloc,
01489                LFieldDimTag<3U,true>) {
01490   T* newptr = ptr + pos[0].first() - alloc[0].first() +
01491          alloc[0].length() * ( pos[1].first() - alloc[1].first() +
01492          alloc[1].length() * ( pos[2].first() - alloc[2].first() ) );
01493   return newptr;
01494 }
01495 
01496 template <class T, unsigned Dim>
01497 inline
01498 T* PtrOffset(T* ptr, const NDIndex<Dim>& pos, const NDIndex<Dim>& alloc,
01499              LFieldDimTag<Dim,false>) {
01500   T* newptr = ptr;
01501   int n=1;
01502   for (unsigned int d=0; d<Dim; ++d) {
01503     newptr += n * (pos[d].first() - alloc[d].first());
01504     n *= alloc[d].length();
01505   }
01506   return newptr;
01507 }
01508 
01509 
01511 // Get a ref to a single element of the Field; if it is not local to our
01512 // processor, print an error and exit.  This allows the user to provide
01513 // different index values on each node, instead of using the same element
01514 // and broadcasting to all nodes.
01515 template<class T, unsigned Dim> 
01516 T& BareField<T,Dim>::localElement(const NDIndex<Dim>& Indexes) const
01517 {
01518   TAU_PROFILE_STMT(T tauT);
01519   TAU_TYPE_STRING(taustr, "BareField<" + CT(tauT) + ",Dim> (" + CT(Indexes) + " )" );
01520   TAU_PROFILE("localElement()", taustr, TAU_FIELD);
01521 
01522   // Instead of checking to see if the user has asked for one element,
01523   // we will just use the first element specified for each dimension.
01524 
01525   // Is this element here?  Broadcast if it is remote...
01526   // Try and find it in the local BareFields.
01527   const_iterator_if lf_i   = begin_if();
01528   const_iterator_if lf_end = end_if();
01529   for ( ; lf_i != lf_end ; ++lf_i ) {
01530     LField<T,Dim>& lf(*(*lf_i).second);
01531     // End-point "contains" OK since "owned" is unit stride.
01532     if ( lf.getOwned().contains( Indexes ) ) {
01533       // Found it ... first uncompress, then get a pointer to the
01534       // requested element.
01535       lf.Uncompress();
01536       //      return *(lf.begin(Indexes));
01537       // instead of building an iterator, just find the value
01538       NDIndex<Dim> alloc = lf.getAllocated();
01539       T* pdata = PtrOffset(lf.getP(), Indexes, alloc,
01540                            LFieldDimTag<Dim,(Dim<=3)>());
01541       return *pdata;
01542     }
01543   }
01544 
01545   // if we're here, we did not find it ... it must not be local
01546   ERRORMSG("BareField::localElement: attempt to access non-local index ");
01547   ERRORMSG(Indexes << " on node " << Ippl::myNode() << endl);
01548   ERRORMSG("Occurred in a BareField with layout = " << getLayout() << endl);
01549   ERRORMSG("Calling abort ..." << endl);
01550   Ippl::abort();
01551   return *((*((*(begin_if())).second)).begin());
01552 }
01553 
01554 
01556 //
01557 // Get a single value and return it in the given storage.  Whichever
01558 // node owns the value must broadcast it to the other nodes.
01559 //
01561 template <class T, unsigned int Dim>
01562 void BareField<T,Dim>::getsingle(const NDIndex<Dim>& Indexes, T& r) const
01563 {
01564   TAU_TYPE_STRING(taustr, "void (" + CT(Indexes) + ", " + CT(r) + " )" );
01565   TAU_PROFILE("BareField::getsingle()", taustr, TAU_FIELD);
01566 
01567   // Instead of checking to see if the user has asked for one element,
01568   // we will just use the first element specified for each dimension.
01569 
01570   // Check to see if the point is in the boundary conditions.
01571   // Check for: The point is not in the owned domain, but it is in that
01572   //            domain augmented with the boundary condition size.
01573   // If so, use a different algorithm.
01574   if ( (!Indexes.touches(Layout->getDomain())) &&
01575        Indexes.touches(AddGuardCells(Layout->getDomain(),Gc)) ) {
01576     getsingle_bc(Indexes,r);
01577     return;
01578   }
01579 
01580   // create a tag for send/receives if necessary
01581   int tag = Ippl::Comm->next_tag(F_GETSINGLE_TAG, F_TAG_CYCLE);
01582 
01583   // Is it here? Try and find it in the LFields
01584   const_iterator_if lf_end = end_if();
01585   for (const_iterator_if lf_i = begin_if(); lf_i != lf_end ; ++lf_i) {
01586     if ( (*lf_i).second->getOwned().contains( Indexes ) ) {
01587       // Found it.
01588       // Get a pointer to the requested element.
01589       LField<T,Dim>& lf( *(*lf_i).second );
01590       typename LField<T,Dim>::iterator lp = lf.begin(Indexes);
01591 
01592       // Get the requested data.
01593       r = *lp;
01594 
01595       // Broadcast it if we're running in parallel
01596       if (Ippl::getNodes() > 1) {
01597         Message *mess = new Message;
01598 	::putMessage(*mess, r);
01599         Ippl::Comm->broadcast_others(mess, tag);
01600       }
01601 
01602       return;
01603     }
01604   }
01605 
01606   // If we're here, we didn't find it: It is remote.  Wait for a message.
01607   if (Ippl::getNodes() > 1) {
01608     int any_node = COMM_ANY_NODE;
01609     Message *mess = Ippl::Comm->receive_block(any_node,tag);
01610     ::getMessage(*mess, r);
01611     delete mess;
01612   }
01613   else {
01614     // we did not find it, and we only have one node, so this must be
01615     // an error.
01616     ERRORMSG("Illegal single-value index " << Indexes);
01617     ERRORMSG(" in BareField::getsingle()" << endl);
01618   }
01619 }
01620 
01622 
01623 //
01624 // Get a single value from the BareField when we know that the 
01625 // value is in the boundary condition area.
01626 // We use a more robust but slower algorithm here because there could 
01627 // be redundancy.
01628 //
01629 
01630 template<class T, unsigned D>
01631 void
01632 BareField<T,D>::getsingle_bc(const NDIndex<D>& Indexes, T& r) const
01633 {
01634   // We will look through everything to find who is the authority
01635   // for the data.  We look through the locals to see if we have it,
01636   // and we look through the remotes to see if someone else has it.
01637   // The lowest numbered processor is the authority.
01638   int authority_proc = -1;
01639 
01640   // Look through all the locals to try and find it.
01641   // Loop over all the LFields.
01642   const_iterator_if lf_end = end_if();
01643   for (const_iterator_if lf_i = begin_if(); lf_i != lf_end ; ++lf_i) {
01644     // Is it in this LField?
01645     if ( (*lf_i).second->getAllocated().contains( Indexes ) ) {
01646       // Found it.  As far as we know now, this is the authority proc.
01647       authority_proc = Ippl::myNode();
01648 
01649       // Get the requested data.
01650       LField<T,D>& lf( *(*lf_i).second );
01651       typename LField<T,D>::iterator lp = lf.begin(Indexes);
01652       r = *lp;
01653 
01654       // If we're not the only guy on the block, go on to find the remotes.
01655       if (Ippl::getNodes() > 1) 
01656         break;
01657 
01658       // Otherwise, we're done.
01659       return;
01660     }
01661   }
01662 
01663   // Look through all the remotes to see who else has it.
01664   // Loop over all the remote LFields that touch Indexes.
01665   typename FieldLayout<D>::touch_range_dv range = 
01666     Layout->touch_range_rdv(Indexes,Gc);
01667   for (typename FieldLayout<D>::touch_iterator_dv p=range.first; 
01668        p!=range.second;++p) 
01669     // See if anybody has a lower processor number.
01670     if ( authority_proc > (*p).second->getNode() )
01671       // Someone else is the authority.
01672       authority_proc = (*p).second->getNode();
01673 
01674   // create a tag for the broadcast.
01675   int tag = Ippl::Comm->next_tag(F_GETSINGLE_TAG, F_TAG_CYCLE);
01676 
01677   if ( authority_proc == Ippl::myNode() ) {
01678     // If we're the authority, broadcast.
01679     Message *mess = new Message;
01680     ::putMessage(*mess, r);
01681     Ippl::Comm->broadcast_others(mess, tag);
01682   }
01683   else {
01684     // If someone else is the authority, receive the message.
01685     Message *mess = Ippl::Comm->receive_block(authority_proc,tag);
01686     ::getMessage(*mess, r);
01687     delete mess;
01688   }
01689   
01690 }
01691 
01692 /***************************************************************************
01693  * $RCSfile: BareField.cpp,v $   $Author: adelmann $
01694  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:26 $
01695  * IPPL_VERSION_ID: $Id: BareField.cpp,v 1.1.1.1 2003/01/23 07:40:26 adelmann Exp $ 
01696  ***************************************************************************/

Generated on Mon Jan 16 13:23:43 2006 for IPPL by  doxygen 1.4.6