00001 // -*- C++ -*- 00002 /*************************************************************************** 00003 * 00004 * The IPPL Framework 00005 * 00006 * 00007 * Visit http://people.web.psi.ch/adelmann/ for more details 00008 * 00009 ***************************************************************************/ 00010 00011 #ifndef COMMUNICATE_H 00012 #define COMMUNICATE_H 00013 00014 /*************************************************************************** 00015 * Communicate.h - communications object for use with Ippl framework. Allows 00016 * user to establish id's for available nodes, establish connections, and 00017 * send/receive data. 00018 ***************************************************************************/ 00019 00020 // include files 00021 #include "Message/TagMaker.h" 00022 #include "Message/Tags.h" 00023 00024 #ifdef IPPL_STDSTL 00025 #include <vector> 00026 #include <utility> 00027 using std::vector; 00028 using std::pair; 00029 #else 00030 #include <vector.h> 00031 #include <pair.h> 00032 #endif // IPPL_STDSTL 00033 00034 #ifdef IPPL_USE_STANDARD_HEADERS 00035 #include <iostream> 00036 using namespace std; 00037 #else 00038 #include <iostream.h> 00039 #endif 00040 00041 // forward declarations 00042 class Message; 00043 class Communicate; 00044 ostream& operator<<(ostream&, const Communicate&); 00045 00046 // special codes used as 'wildcards' to match any node or tag 00047 const int COMM_ANY_NODE = (-1); 00048 const int COMM_ANY_TAG = (-1); 00049 00050 00051 // A simple class used to store information for caching sent messages. This 00052 // is only used if the 'retransmit' option is active. 00053 class CommSendInfo 00054 { 00055 public: 00056 CommSendInfo() 00057 : size_m(0), buf_m(0) 00058 { 00059 } 00060 00061 CommSendInfo(int size, char *buf, int node) 00062 : size_m(size), buf_m(buf), node_m(node) 00063 { 00064 } 00065 00066 CommSendInfo(const CommSendInfo &c) 00067 : size_m(c.size_m), buf_m(c.buf_m), node_m(c.node_m) 00068 { 00069 } 00070 00071 ~CommSendInfo() 00072 { 00073 // the user is actually responsible for freeing the buffer. We 00074 // do not do this automatically here 00075 } 00076 00077 CommSendInfo &operator=(const CommSendInfo &c) 00078 { 00079 size_m = c.size_m; 00080 buf_m = c.buf_m; 00081 node_m = c.node_m; 00082 return *this; 00083 } 00084 00085 int size() const { return size_m; } 00086 00087 int node() const { return node_m; } 00088 00089 char *buf() { return buf_m; } 00090 const char *buf() const { return buf_m; } 00091 00092 void freebuf() 00093 { 00094 if (buf_m != 0) 00095 delete [] buf_m; 00096 buf_m = 0; 00097 } 00098 00099 private: 00100 int size_m; 00101 int node_m; 00102 char *buf_m; 00103 }; 00104 00105 00106 // The base class for all specific Communicate objects 00107 class Communicate : public TagMaker 00108 { 00109 00110 public: 00111 // default error codes, may be overridden by inherited classes. 00112 enum CommErrors { COMM_NOERROR, COMM_ERROR, COMM_NOSEND, COMM_NORECEIVE }; 00113 00114 // special tags used by this class ... 32000 is arbitrary 00115 enum CommTags { COMM_HOSTS_TAG = 32000, COMM_DIE_TAG, COMM_SEND_TAG }; 00116 00117 // special codes used as 'wildcards' to match any node or tag 00118 // These are listed again because they should be here, but the global 00119 // values are kept for compatibility. 00120 enum CommCodes { COMM_ANY_NODE = (-1), COMM_ANY_TAG = (-1) }; 00121 00122 public: 00123 // constructor and destructor 00124 // constructor arguments: command-line args, and number of processes 00125 // to start (if < 0, start the 'default' number, i.e. the number of 00126 // hosts in a PVM virtual machine, the number of nodes in an O2K, etc) 00127 Communicate(int argc = 0, char** argv = NULL, int procs = (-1)); 00128 virtual ~Communicate(void); 00129 00130 // return the name of this item 00131 virtual const char *name() const { return "Serial"; } 00132 00133 // return info about connections in general 00134 int getNodes() const { return TotalNodes; } 00135 int getContexts(const int n) const { return Contexts[n]; } 00136 int getProcesses(const int n, const int c) const { return Processes[n][c]; } 00137 int myNode() const { return myHost; } 00138 int getError() const { return ErrorStatus; } 00139 int getReceived() const { return recMsgList.size(); } 00140 00141 // 00142 // nonvirtual routines to send/receive data 00143 // 00144 00145 // send data to another node. Returns success (T or F). 00146 // last argument specifies whether to delete the Message after sending 00147 // (if message is for another node). Note that if the send is not 00148 // successful, the message will NOT be deleted, regardless of delmsg. 00149 bool send(Message *, int node, int tag, bool delmsg = true); 00150 00151 // receive data from another node. Returns newly created Message object 00152 // with received message, or NULL if no message is available. 00153 // If node is < 0, this will receive the next message with the given tag 00154 // from any node. If tag < 0, this will receive the next message with 00155 // any tag from the given node. If both are < 0, this will receive the 00156 // next message, period. node and tag are passed by reference; if either 00157 // is < 0, and a message is received, they are changed to their actual 00158 // values. 00159 Message *receive(int& node, int& tag); 00160 00161 // a blocking version of receive; 00162 Message *receive_block(int& node, int& tag); 00163 00164 // 00165 // virtual routines to broadcast data 00166 // 00167 00168 // broadcast the current message to other nodes. 00169 // Return number of nodes actually sent to. 00170 // The first version sends to all nodes including this node. 00171 // The second version sends to all nodes except this node. 00172 // The first argument is the Message; the last argument is the tag. 00173 virtual int broadcast_all(Message *, int); 00174 virtual int broadcast_others(Message *, int, bool delmsg=true); 00175 00176 00177 // 00178 // routines to synchronize processors at a barrier 00179 // 00180 00181 // Synchronize all processors (everybody waits for everybody 00182 // else to get here before returning to calling function). 00183 void barrier(void); 00184 00185 // 00186 // virtual routines to deal with memory management 00187 // 00188 00189 // clean up after a Message has been used (called by Message). By 00190 // default, does nothing. 00191 virtual void cleanupMessage(void *); 00192 00193 protected: 00194 // struct used to store messages, tags, and nodes 00195 struct MessageData { 00196 int node; // sending/receiving node 00197 int tag; // tag of the message 00198 Message *msg; // pointer to the message itself 00199 MessageData(int n, int t, Message *m) : node(n),tag(t),msg(m) {} 00200 MessageData() : node(-1), tag(-1), msg(0) { } 00201 MessageData(const MessageData& m) : node(m.node),tag(m.tag),msg(m.msg) {} 00202 ~MessageData() {} 00203 }; 00204 00205 // a list of messages which have already been received, but not yet 00206 // delivered 00207 vector<MessageData> recMsgList; 00208 00209 // the following items should be filled in by the derived classes 00210 int TotalNodes; // number of nodes available (0 ... # nodes-1) 00211 int myHost; // which node am I? 00212 int ErrorStatus; // error code, from above enumeration 00213 vector<int> Contexts; // the number of contexts per node 00214 vector< vector<int> > Processes; // number of running processes per context 00215 00216 // An integer message number identifier; this is included in each 00217 // message, and continually increases as more messages are sent. 00218 typedef long MsgNum_t; 00219 MsgNum_t nextMsgNum; 00220 00221 // An optional sent-message cache, used to attempt to retransmit 00222 // messages if they are corrupted in-transit. Messages are keyed on 00223 // a message number, which is is unique for each message. 00224 typedef map<MsgNum_t, CommSendInfo> SentCache_t; 00225 SentCache_t sentMsgCache; 00226 00227 // a list of things to resend at the next opportunity 00228 vector<MsgNum_t> resendList; 00229 00230 // a list of messages which have been received OK 00231 vector<MsgNum_t> sentOKList; 00232 00233 // a list of messages which should be cleared out on other nodes 00234 vector<pair<int,MsgNum_t> > informOKList; 00235 00236 // a list of requests we must make to other nodes to resend messages 00237 vector<pair<int,MsgNum_t> > requestList; 00238 00239 // add a new message to the received message queues. Return success. 00240 // arguments: message, sending node, tag 00241 bool add_msg(Message *, int, int); 00242 00243 // Looks for a message in the message queue from the specified node 00244 // and tag. This understands wildcards for node and tag. 00245 // Returns a pointer to the Message object found, and sets node and 00246 // tag equal to the proper values. Also, this will remove the item from 00247 // the queue. 00248 Message* find_msg(int&, int&); 00249 00250 // 00251 // implementation-specific routines (which begin with 'my') 00252 // these should be provided in a derived class, and contain the 00253 // comm-library-specific code 00254 // 00255 00256 // send a message ... arguments are the Message itself, the 00257 // destination node, the 'user' tag, and the 'encoding' tag. 00258 // Messages should be sent via the underlying mechanism by using the 00259 // encoding tag (one of the COMM_ tags), 00260 // and should embed the information about what the user 00261 // tag is in the data sent between nodes. Return success. 00262 virtual bool mysend(Message *, int node, int utag, int etag); 00263 00264 // receive a message from the given node and user tag. Return a NEW 00265 // Message object if a message arrives, or NULL if no message available. 00266 // node will be set to the node from which the message was sent. 00267 // tag will be set to the 'user tag' for that message. 00268 // etag is the 'encoding' tag, and must be one of the COMM_ tags. 00269 // Only message sent via the underlying mechanism with the 00270 // given etag are checked. When one is found, the user tag and sending 00271 // node are extracted from the sent data. 00272 // If node = COMM_ANY_NODE, checks for messages from any node. 00273 // If tag = COMM_ANY_TAG, checks for messages with any user tag. 00274 virtual Message *myreceive(int& node, int& tag, int etag); 00275 00276 // Synchronize all processors (everybody waits for everybody 00277 // else to get here before returning to calling function). 00278 virtual void mybarrier(void); 00279 00280 // resent a message buffer that has been previously packed and copied 00281 // into the provided buffer. Return success. 00282 virtual bool resend(void *buf, int size, int node, int etag); 00283 00284 // 00285 // utility functions used to serialize data into and out of byte buffers 00286 // 00287 00288 // standard way to create and free buffer storage 00289 static inline void *makebuffer(int size) { return malloc(size); } 00290 static inline void freebuffer(void *buf) { free(buf); } 00291 00292 // compute the size of storage needed to add 'size' bytes to a buffer, 00293 // in order to keep everything word-aligned 00294 #if defined(IPPL_LONGLONG) 00295 static inline unsigned int wordround(int size) { 00296 return sizeof(long long) * 00297 ((size + sizeof(long long) - 1)/sizeof(long long)); 00298 } 00299 #else 00300 static inline unsigned int wordround(int size) { 00301 return sizeof(long) * 00302 ((size + sizeof(long) - 1)/sizeof(long)); 00303 } 00304 #endif 00305 00306 // compute a wordround value for 'size' bytes, then add that to the 00307 // given 'pos' pointer 00308 static inline void addwordround(void * &pos, int size) { 00309 pos = static_cast<void *>(wordround(size) + static_cast<char *>(pos)); 00310 } 00311 00312 // memcpy data into the given location, and then increment the pointer 00313 static inline void pack(void *packdata, void * &pos, int size) { 00314 memcpy(pos, packdata, size); 00315 addwordround(pos, size); 00316 } 00317 00318 // memcpy data out of a given location to another, updating 'pos' 00319 static inline void unpack(void * &pos, void *packdata, int size) { 00320 memcpy(packdata, pos, size); 00321 addwordround(pos, size); 00322 } 00323 00324 // 00325 // utility functions used in packing and unpacking Message data 00326 // 00327 00328 // calculate how big the buffer must be to send the given message 00329 int find_msg_length(Message &); 00330 00331 // put data from the given Message into the given buffer, with tag value. 00332 // the final arguments are the buffer size, in bytes, and the dest node. 00333 void fill_msg_buffer(void *, Message &, int, int, int); 00334 00335 // take data out of the current receive buf and create a new Message 00336 Message *unpack_message(int &node, int &tag, void *pos); 00337 00338 // 00339 // utility functions used for message caching/retransmit 00340 // 00341 00342 // put the given message buffer in the sent-message cache, as a new 00343 // CommSendInfo object storing the buffer and other information. 00344 void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node); 00345 00346 // send off a request to have this message retransmitted to us 00347 void request_retransmission(int node, MsgNum_t mnum); 00348 00349 // resend the data for message mnum ... calls the virtual 'resend' 00350 void perform_resend(MsgNum_t mnum); 00351 00352 // get the resend information from a buffer sent in a message requesting 00353 // retransmission 00354 void unpack_retransmission_request(int nitems, void *pos); 00355 00356 // tell the sender that we received this message OK 00357 void send_ok_message(int node, MsgNum_t mnum); 00358 00359 // unpack message with a list of OK message numbers, and delete them 00360 // from our cache 00361 void clear_ok_messages(int nitems, void *pos); 00362 00363 // remove a single OK message 00364 void remove_single_ok_message(MsgNum_t mnum); 00365 00366 // process list of resend requests 00367 void process_resend_requests(); 00368 }; 00369 00370 #endif // COMMUNICATE_H 00371 00372 00373 /*************************************************************************** 00374 * $RCSfile: Communicate.h,v $ $Author: adelmann $ 00375 * $Revision: 1.1.1.1 $ $Date: 2003/01/23 07:40:28 $ 00376 * IPPL_VERSION_ID: $Id: Communicate.h,v 1.1.1.1 2003/01/23 07:40:28 adelmann Exp $ 00377 ***************************************************************************/