src/Message/Communicate.h

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 /***************************************************************************
00003  *
00004  * The IPPL Framework
00005  * 
00006  *
00007  * Visit http://people.web.psi.ch/adelmann/ for more details
00008  *
00009  ***************************************************************************/
00010 
00011 #ifndef COMMUNICATE_H
00012 #define COMMUNICATE_H
00013 
00014 /***************************************************************************
00015  * Communicate.h - communications object for use with Ippl framework.  Allows
00016  * user to establish id's for available nodes, establish connections, and
00017  * send/receive data.
00018  ***************************************************************************/
00019 
00020 // include files
00021 #include "Message/TagMaker.h"
00022 #include "Message/Tags.h"
00023 
00024 #ifdef IPPL_STDSTL
00025 #include <vector>
00026 #include <utility>
00027 using std::vector;
00028 using std::pair;
00029 #else
00030 #include <vector.h>
00031 #include <pair.h>
00032 #endif // IPPL_STDSTL
00033 
00034 #ifdef IPPL_USE_STANDARD_HEADERS
00035 #include <iostream>
00036 using namespace std;
00037 #else
00038 #include <iostream.h>
00039 #endif
00040 
00041 // forward declarations
00042 class Message;
00043 class Communicate;
00044 ostream& operator<<(ostream&, const Communicate&);
00045 
00046 // special codes used as 'wildcards' to match any node or tag
00047 const int COMM_ANY_NODE = (-1);
00048 const int COMM_ANY_TAG  = (-1);
00049 
00050 
00051 // A simple class used to store information for caching sent messages.  This
00052 // is only used if the 'retransmit' option is active.
00053 class CommSendInfo
00054 {
00055 public:
00056   CommSendInfo()
00057     : size_m(0), buf_m(0)
00058     {
00059     }
00060 
00061   CommSendInfo(int size, char *buf, int node)
00062     : size_m(size), buf_m(buf), node_m(node)
00063     {
00064     }
00065 
00066   CommSendInfo(const CommSendInfo &c)
00067     : size_m(c.size_m), buf_m(c.buf_m), node_m(c.node_m)
00068     {
00069     }
00070 
00071   ~CommSendInfo()
00072     {
00073       // the user is actually responsible for freeing the buffer.  We
00074       // do not do this automatically here
00075     }
00076 
00077   CommSendInfo &operator=(const CommSendInfo &c)
00078     {
00079       size_m = c.size_m;
00080       buf_m = c.buf_m;
00081       node_m = c.node_m;
00082       return *this;
00083     }
00084 
00085   int size() const { return size_m; }
00086 
00087   int node() const { return node_m; }
00088 
00089   char *buf() { return buf_m; }
00090   const char *buf() const { return buf_m; }
00091 
00092   void freebuf()
00093     {
00094       if (buf_m != 0)
00095         delete [] buf_m;
00096       buf_m = 0;
00097     }
00098 
00099 private:
00100   int size_m;
00101   int node_m;
00102   char *buf_m;
00103 };
00104 
00105 
00106 // The base class for all specific Communicate objects
00107 class Communicate : public TagMaker
00108 {
00109 
00110 public:
00111   // default error codes, may be overridden by inherited classes.
00112   enum CommErrors { COMM_NOERROR, COMM_ERROR, COMM_NOSEND, COMM_NORECEIVE };
00113 
00114   // special tags used by this class ... 32000 is arbitrary
00115   enum CommTags { COMM_HOSTS_TAG = 32000, COMM_DIE_TAG, COMM_SEND_TAG };
00116 
00117   // special codes used as 'wildcards' to match any node or tag
00118   // These are listed again because they should be here, but the global
00119   // values are kept for compatibility.
00120   enum CommCodes { COMM_ANY_NODE = (-1), COMM_ANY_TAG = (-1) };
00121 
00122 public:
00123   // constructor and destructor
00124   // constructor arguments: command-line args, and number of processes
00125   // to start (if < 0, start the 'default' number, i.e. the number of
00126   // hosts in a PVM virtual machine, the number of nodes in an O2K, etc)
00127   Communicate(int argc = 0, char** argv = NULL, int procs = (-1));
00128   virtual ~Communicate(void);
00129 
00130   // return the name of this item
00131   virtual const char *name() const { return "Serial"; }
00132 
00133   // return info about connections in general
00134   int getNodes() const { return TotalNodes; }
00135   int getContexts(const int n) const { return Contexts[n]; }
00136   int getProcesses(const int n, const int c) const { return Processes[n][c]; }
00137   int myNode() const { return myHost; }
00138   int getError() const { return ErrorStatus; }
00139   int getReceived() const { return recMsgList.size(); }
00140 
00141   //
00142   //    nonvirtual routines to send/receive data
00143   //
00144  
00145   // send data to another node.  Returns success (T or F).
00146   // last argument specifies whether to delete the Message after sending
00147   // (if message is for another node).  Note that if the send is not
00148   // successful, the message will NOT be deleted, regardless of delmsg.
00149   bool send(Message *, int node, int tag, bool delmsg = true);
00150 
00151   // receive data from another node.  Returns newly created Message object
00152   // with received message, or NULL if no message is available.
00153   // If node is < 0, this will receive the next message with the given tag
00154   // from any node.  If tag < 0, this will receive the next message with
00155   // any tag from the given node.  If both are < 0, this will receive the
00156   // next message, period.  node and tag are passed by reference; if either
00157   // is < 0, and a message is received, they are changed to their actual
00158   // values.
00159   Message *receive(int& node, int& tag);
00160  
00161   // a blocking version of receive;
00162   Message *receive_block(int& node, int& tag);
00163 
00164   //
00165   //    virtual routines to broadcast data
00166   //
00167  
00168   // broadcast the current message to other nodes.
00169   // Return number of nodes actually sent to.
00170   // The first version sends to all nodes including this node.
00171   // The second version sends to all nodes except this node.
00172   // The first argument is the Message; the last argument is the tag.
00173   virtual int broadcast_all(Message *, int);
00174   virtual int broadcast_others(Message *, int, bool delmsg=true);
00175 
00176 
00177   //
00178   //    routines to synchronize processors at a barrier
00179   //
00180 
00181   // Synchronize all processors (everybody waits for everybody
00182   // else to get here before returning to calling function).
00183   void barrier(void);
00184 
00185   //
00186   //    virtual routines to deal with memory management
00187   //
00188 
00189   // clean up after a Message has been used (called by Message).  By
00190   // default, does nothing.
00191   virtual void cleanupMessage(void *);
00192 
00193 protected:
00194   // struct used to store messages, tags, and nodes
00195   struct MessageData {
00196     int node;                   // sending/receiving node
00197     int tag;                    // tag of the message
00198     Message *msg;               // pointer to the message itself
00199     MessageData(int n, int t, Message *m) : node(n),tag(t),msg(m) {}
00200     MessageData() : node(-1), tag(-1), msg(0) { }
00201     MessageData(const MessageData& m) : node(m.node),tag(m.tag),msg(m.msg) {}
00202     ~MessageData() {}
00203   };
00204 
00205   // a list of messages which have already been received, but not yet
00206   // delivered
00207   vector<MessageData> recMsgList;
00208 
00209   // the following items should be filled in by the derived classes
00210   int TotalNodes;               // number of nodes available (0 ... # nodes-1)
00211   int myHost;                   // which node am I?
00212   int ErrorStatus;              // error code, from above enumeration
00213   vector<int> Contexts;         // the number of contexts per node
00214   vector< vector<int> > Processes;   // number of running processes per context
00215 
00216   // An integer message number identifier; this is included in each
00217   // message, and continually increases as more messages are sent.
00218   typedef long MsgNum_t;
00219   MsgNum_t nextMsgNum;
00220 
00221   // An optional sent-message cache, used to attempt to retransmit
00222   // messages if they are corrupted in-transit.  Messages are keyed on
00223   // a message number, which is is unique for each message.
00224   typedef map<MsgNum_t, CommSendInfo> SentCache_t;
00225   SentCache_t sentMsgCache;
00226 
00227   // a list of things to resend at the next opportunity
00228   vector<MsgNum_t> resendList;
00229 
00230   // a list of messages which have been received OK
00231   vector<MsgNum_t> sentOKList;
00232 
00233   // a list of messages which should be cleared out on other nodes
00234   vector<pair<int,MsgNum_t> > informOKList;
00235 
00236   // a list of requests we must make to other nodes to resend messages
00237   vector<pair<int,MsgNum_t> > requestList;
00238   
00239   // add a new message to the received message queues.  Return success.
00240   // arguments: message, sending node, tag
00241   bool add_msg(Message *, int, int);
00242 
00243   // Looks for a message in the message queue from the specified node
00244   // and tag.  This understands wildcards for node and tag.
00245   // Returns a pointer to the Message object found, and sets node and
00246   // tag equal to the proper values.  Also, this will remove the item from
00247   // the queue.
00248   Message* find_msg(int&, int&);
00249 
00250   //
00251   // implementation-specific routines (which begin with 'my')
00252   //    these should be provided in a derived class, and contain the
00253   //    comm-library-specific code
00254   //
00255 
00256   // send a message ... arguments are the Message itself, the
00257   // destination node, the 'user' tag, and the 'encoding' tag.
00258   // Messages should be sent via the underlying mechanism by using the
00259   // encoding tag (one of the COMM_ tags),
00260   // and should embed the information about what the user
00261   // tag is in the data sent between nodes.  Return success.
00262   virtual bool mysend(Message *, int node, int utag, int etag);
00263 
00264   // receive a message from the given node and user tag.  Return a NEW
00265   // Message object if a message arrives, or NULL if no message available.
00266   // node will be set to the node from which the message was sent.
00267   // tag will be set to the 'user tag' for that message.
00268   // etag is the 'encoding' tag, and must be one of the COMM_ tags.
00269   // Only message sent via the underlying mechanism with the
00270   // given etag are checked.  When one is found, the user tag and sending
00271   // node are extracted from the sent data.
00272   // If node = COMM_ANY_NODE, checks for messages from any node.
00273   // If tag = COMM_ANY_TAG, checks for messages with any user tag.
00274   virtual Message *myreceive(int& node, int& tag, int etag);
00275 
00276   // Synchronize all processors (everybody waits for everybody
00277   // else to get here before returning to calling function).
00278   virtual void mybarrier(void);
00279 
00280   // resent a message buffer that has been previously packed and copied
00281   // into the provided buffer.  Return success.
00282   virtual bool resend(void *buf, int size, int node, int etag);
00283 
00284   //
00285   // utility functions used to serialize data into and out of byte buffers
00286   //
00287 
00288   // standard way to create and free buffer storage
00289   static inline void *makebuffer(int size) { return malloc(size); }
00290   static inline void freebuffer(void *buf) { free(buf); }
00291 
00292   // compute the size of storage needed to add 'size' bytes to a buffer,
00293   // in order to keep everything word-aligned
00294 #if defined(IPPL_LONGLONG)
00295   static inline unsigned int wordround(int size) {
00296     return sizeof(long long) *
00297       ((size + sizeof(long long) - 1)/sizeof(long long));
00298   }
00299 #else
00300   static inline unsigned int wordround(int size) {
00301     return sizeof(long) *
00302       ((size + sizeof(long) - 1)/sizeof(long));
00303   }
00304 #endif
00305 
00306   // compute a wordround value for 'size' bytes, then add that to the
00307   // given 'pos' pointer
00308   static inline void addwordround(void * &pos, int size) {
00309     pos = static_cast<void *>(wordround(size) + static_cast<char *>(pos));
00310   }
00311 
00312   // memcpy data into the given location, and then increment the pointer
00313   static inline void pack(void *packdata, void * &pos, int size) {
00314     memcpy(pos, packdata, size);
00315     addwordround(pos, size);
00316   }
00317 
00318   // memcpy data out of a given location to another, updating 'pos'
00319   static inline void unpack(void * &pos, void *packdata, int size) {
00320     memcpy(packdata, pos, size);
00321     addwordround(pos, size);
00322   }
00323 
00324   //
00325   // utility functions used in packing and unpacking Message data
00326   //
00327 
00328   // calculate how big the buffer must be to send the given message
00329   int find_msg_length(Message &);
00330 
00331   // put data from the given Message into the given buffer, with tag value.
00332   // the final arguments are the buffer size, in bytes, and the dest node.
00333   void fill_msg_buffer(void *, Message &, int, int, int);
00334 
00335   // take data out of the current receive buf and create a new Message
00336   Message *unpack_message(int &node, int &tag, void *pos);
00337 
00338   //
00339   // utility functions used for message caching/retransmit
00340   //
00341 
00342   // put the given message buffer in the sent-message cache, as a new
00343   // CommSendInfo object storing the buffer and other information.
00344   void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node);
00345 
00346   // send off a request to have this message retransmitted to us
00347   void request_retransmission(int node, MsgNum_t mnum);
00348 
00349   // resend the data for message mnum ... calls the virtual 'resend'
00350   void perform_resend(MsgNum_t mnum);
00351 
00352   // get the resend information from a buffer sent in a message requesting
00353   // retransmission
00354   void unpack_retransmission_request(int nitems, void *pos);
00355 
00356   // tell the sender that we received this message OK
00357   void send_ok_message(int node, MsgNum_t mnum);
00358 
00359   // unpack message with a list of OK message numbers, and delete them
00360   // from our cache
00361   void clear_ok_messages(int nitems, void *pos);
00362 
00363   // remove a single OK message
00364   void remove_single_ok_message(MsgNum_t mnum);
00365 
00366   // process list of resend requests
00367   void process_resend_requests();
00368 };
00369 
00370 #endif // COMMUNICATE_H
00371 
00372 
00373 /***************************************************************************
00374  * $RCSfile: Communicate.h,v $   $Author: adelmann $
00375  * $Revision: 1.1.1.1 $   $Date: 2003/01/23 07:40:28 $
00376  * IPPL_VERSION_ID: $Id: Communicate.h,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 
00377  ***************************************************************************/

Generated on Mon Jan 16 13:23:52 2006 for IPPL by  doxygen 1.4.6