OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
Communicate.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  *
7  * Visit http://people.web.psi.ch/adelmann/ for more details
8  *
9  ***************************************************************************/
10 
11 #ifndef COMMUNICATE_H
12 #define COMMUNICATE_H
13 
14 /***************************************************************************
15  * Communicate.h - communications object for use with Ippl framework. Allows
16  * user to establish id's for available nodes, establish connections, and
17  * send/receive data.
18  ***************************************************************************/
19 
20 // include files
21 #include "Message/TagMaker.h"
22 #include "Message/Tags.h"
23 #include <cstdlib>
24 #include <memory>
25 #include <cstring>
26 
27 #include <vector>
28 #include <utility>
29 
30 #include <iostream>
31 
32 #include <mpi.h>
33 
34 // forward declarations
35 class Message;
36 class Communicate;
37 std::ostream& operator<<(std::ostream&, const Communicate&);
38 
39 // special codes used as 'wildcards' to match any node or tag
40 const int COMM_ANY_NODE = (-1);
41 const int COMM_ANY_TAG = (-1);
42 
43 
44 // A simple class used to store information for caching sent messages. This
45 // is only used if the 'retransmit' option is active.
47 {
48 public:
50  : size_m(0), buf_m(0)
51  {
52  }
53 
54  CommSendInfo(int size, char *buf, int node)
55  : size_m(size), node_m(node), buf_m(buf)
56  {
57  }
58 
60  : size_m(c.size_m), node_m(c.node_m), buf_m(c.buf_m)
61  {
62  }
63 
65  {
66  // the user is actually responsible for freeing the buffer. We
67  // do not do this automatically here
68  }
69 
71  {
72  size_m = c.size_m;
73  buf_m = c.buf_m;
74  node_m = c.node_m;
75  return *this;
76  }
77 
78  int size() const
79  {
80  return size_m;
81  }
82 
83  int node() const
84  {
85  return node_m;
86  }
87 
88  char *buf()
89  {
90  return buf_m;
91  }
92  const char *buf() const
93  {
94  return buf_m;
95  }
96 
97  void freebuf()
98  {
99  if (buf_m != 0)
100  delete [] buf_m;
101  buf_m = 0;
102  }
103 
104 private:
105  int size_m;
106  int node_m;
107  char *buf_m;
108 };
109 
110 
111 // The base class for all specific Communicate objects
112 class Communicate : public TagMaker
113 {
114 
115 public:
116  // default error codes, may be overridden by inherited classes.
118 
119  // special tags used by this class ... 32000 is arbitrary
121 
122  // special codes used as 'wildcards' to match any node or tag
123  // These are listed again because they should be here, but the global
124  // values are kept for compatibility.
125  enum CommCodes { COMM_ANY_NODE = (-1), COMM_ANY_TAG = (-1) };
126 
127 public:
128 
129  // constructor and destructor
130  // constructor arguments: command-line args, and number of processes
131  // to start (if < 0, start the 'default' number, i.e. the number of
132  // hosts in a PVM virtual machine, the number of nodes in an O2K, etc)
133  Communicate(int argc = 0, char** argv = NULL, int procs = (-1));
134  virtual ~Communicate(void);
135 
136  // return the name of this item
137  virtual const char *name() const
138  {
139  return "Serial";
140  }
141 
142  // return info about connections in general
143  int getNodes() const
144  {
145  return TotalNodes;
146  }
147  int getContexts(const int n) const
148  {
149  return Contexts[n];
150  }
151  int getProcesses(const int n, const int c) const
152  {
153  return Processes[n][c];
154  }
155  int myNode() const
156  {
157  return myHost;
158  }
159  int getError() const
160  {
161  return ErrorStatus;
162  }
163  int getReceived() const
164  {
165  return recMsgList.size();
166  }
167 
168  //
169  // nonvirtual routines to send/receive data
170  //
171 
172  // send data to another node. Returns success (T or F).
173  // last argument specifies whether to delete the Message after sending
174  // (if message is for another node). Note that if the send is not
175  // successful, the message will NOT be deleted, regardless of delmsg.
176  bool send(Message *, int node, int tag, bool delmsg = true);
177 
178  // receive data from another node. Returns newly created Message object
179  // with received message, or NULL if no message is available.
180  // If node is < 0, this will receive the next message with the given tag
181  // from any node. If tag < 0, this will receive the next message with
182  // any tag from the given node. If both are < 0, this will receive the
183  // next message, period. node and tag are passed by reference; if either
184  // is < 0, and a message is received, they are changed to their actual
185  // values.
186  Message *receive(int& node, int& tag);
187 
188  // a blocking version of receive;
189  Message *receive_block(int& node, int& tag);
190 
191  //send and receive for raw data
192  virtual bool raw_send(void *, int , int , int )
193  {
194  return false;
195  }
196  virtual MPI_Request raw_isend(void *, int , int , int )
197  {
198  return MPI_Request();
199  }
200  virtual int raw_receive(char *, int , int &, int &)
201  {
202  return 0;
203  }
204  virtual MPI_Request raw_ireceive(char *, int , int , int )
205  {
206  return MPI_Request();
207  }
208  virtual int raw_probe_receive(char *&, int &, int &)
209  {
210  return 0;
211  }
212 
213 
214  //
215  // virtual routines to broadcast data
216  //
217 
218  // broadcast the current message to other nodes.
219  // Return number of nodes actually sent to.
220  // The first version sends to all nodes including this node.
221  // The second version sends to all nodes except this node.
222  // The first argument is the Message; the last argument is the tag.
223  virtual int broadcast_all(Message *, int);
224  virtual int broadcast_others(Message *, int, bool delmsg=true);
225 
226 
227  //
228  // routines to synchronize processors at a barrier
229  //
230 
231  // Synchronize all processors (everybody waits for everybody
232  // else to get here before returning to calling function).
233  void barrier(void);
234 
235  //
236  // virtual routines to deal with memory management
237  //
238 
239  // clean up after a Message has been used (called by Message). By
240  // default, does nothing.
241  virtual void cleanupMessage(void *);
242 
243 protected:
244  // struct used to store messages, tags, and nodes
245  struct MessageData
246  {
247  int node; // sending/receiving node
248  int tag; // tag of the message
249  Message *msg; // pointer to the message itself
250  MessageData(int n, int t, Message *m) : node(n),tag(t),msg(m) {}
251  MessageData() : node(-1), tag(-1), msg(0) { }
252  MessageData(const MessageData& m) : node(m.node),tag(m.tag),msg(m.msg) {}
254  };
255 
256  // a list of messages which have already been received, but not yet
257  // delivered
258  std::vector<MessageData> recMsgList;
259 
260  // the following items should be filled in by the derived classes
261  int TotalNodes; // number of nodes available (0 ... # nodes-1)
262  int myHost; // which node am I?
263  int ErrorStatus; // error code, from above enumeration
264  std::vector<int> Contexts; // the number of contexts per node
265  std::vector< std::vector<int> > Processes; // number of running processes per context
266 
267  // An integer message number identifier; this is included in each
268  // message, and continually increases as more messages are sent.
269  typedef long MsgNum_t;
271 
272  // An optional sent-message cache, used to attempt to retransmit
273  // messages if they are corrupted in-transit. Messages are keyed on
274  // a message number, which is is unique for each message.
275  typedef std::map<MsgNum_t, CommSendInfo> SentCache_t;
277 
278  // a list of things to resend at the next opportunity
279  std::vector<MsgNum_t> resendList;
280 
281  // a list of messages which have been received OK
282  std::vector<MsgNum_t> sentOKList;
283 
284  // a list of messages which should be cleared out on other nodes
285  std::vector<std::pair<int,MsgNum_t> > informOKList;
286 
287  // a list of requests we must make to other nodes to resend messages
288  std::vector<std::pair<int,MsgNum_t> > requestList;
289 
290  // add a new message to the received message queues. Return success.
291  // arguments: message, sending node, tag
292  bool add_msg(Message *, int, int);
293 
294  // Looks for a message in the message queue from the specified node
295  // and tag. This understands wildcards for node and tag.
296  // Returns a pointer to the Message object found, and sets node and
297  // tag equal to the proper values. Also, this will remove the item from
298  // the queue.
299  Message* find_msg(int&, int&);
300 
301  //
302  // implementation-specific routines (which begin with 'my')
303  // these should be provided in a derived class, and contain the
304  // comm-library-specific code
305  //
306 
307  // send a message ... arguments are the Message itself, the
308  // destination node, the 'user' tag, and the 'encoding' tag.
309  // Messages should be sent via the underlying mechanism by using the
310  // encoding tag (one of the COMM_ tags),
311  // and should embed the information about what the user
312  // tag is in the data sent between nodes. Return success.
313  virtual bool mysend(Message *, int node, int utag, int etag);
314 
315  // receive a message from the given node and user tag. Return a NEW
316  // Message object if a message arrives, or NULL if no message available.
317  // node will be set to the node from which the message was sent.
318  // tag will be set to the 'user tag' for that message.
319  // etag is the 'encoding' tag, and must be one of the COMM_ tags.
320  // Only message sent via the underlying mechanism with the
321  // given etag are checked. When one is found, the user tag and sending
322  // node are extracted from the sent data.
323  // If node = COMM_ANY_NODE, checks for messages from any node.
324  // If tag = COMM_ANY_TAG, checks for messages with any user tag.
325  virtual Message *myreceive(int& node, int& tag, int etag);
326 
327  // Synchronize all processors (everybody waits for everybody
328  // else to get here before returning to calling function).
329  virtual void mybarrier(void);
330 
331  // resent a message buffer that has been previously packed and copied
332  // into the provided buffer. Return success.
333  virtual bool resend(void *buf, int size, int node, int etag);
334 
335  //
336  // utility functions used to serialize data into and out of byte buffers
337  //
338  // standard way to create and free buffer storage
339  static inline void *makebuffer(int size)
340  {
341  return malloc(size);
342  }
343  static inline void freebuffer(void *buf)
344  {
345  free(buf);
346  }
347 
348  // compute the size of storage needed to add 'size' bytes to a buffer,
349  // in order to keep everything word-aligned
350  static inline unsigned int wordround(int size)
351  {
352  return sizeof(long long) *
353  ((size + sizeof(long long) - 1)/sizeof(long long));
354  }
355 
356  // compute a wordround value for 'size' bytes, then add that to the
357  // given 'pos' pointer
358  static inline void addwordround(void * &pos, int size)
359  {
360  pos = static_cast<void *>(wordround(size) + static_cast<char *>(pos));
361  }
362 
363  // memcpy data into the given location, and then increment the pointer
364  static inline void pack(void *packdata, void * &pos, int size)
365  {
366  memcpy(pos, packdata, size);
367  addwordround(pos, size);
368  }
369 
370  // memcpy data out of a given location to another, updating 'pos'
371  static inline void unpack(void * &pos, void *packdata, int size)
372  {
373  memcpy(packdata, pos, size);
374  addwordround(pos, size);
375  }
376 
377  //
378  // utility functions used in packing and unpacking Message data
379  //
380 
381  // calculate how big the buffer must be to send the given message
382  int find_msg_length(Message &);
383 
384  // put data from the given Message into the given buffer, with tag value.
385  // the final arguments are the buffer size, in bytes, and the dest node.
386  void fill_msg_buffer(void *, Message &, int, int, int);
387 
388  // take data out of the current receive buf and create a new Message
389  Message *unpack_message(int &node, int &tag, void *pos);
390 
391  //
392  // utility functions used for message caching/retransmit
393  //
394 
395  // put the given message buffer in the sent-message cache, as a new
396  // CommSendInfo object storing the buffer and other information.
397  void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node);
398 
399  // send off a request to have this message retransmitted to us
400  void request_retransmission(int node, MsgNum_t mnum);
401 
402  // resend the data for message mnum ... calls the virtual 'resend'
403  void perform_resend(MsgNum_t mnum);
404 
405  // get the resend information from a buffer sent in a message requesting
406  // retransmission
407  void unpack_retransmission_request(int nitems, void *pos);
408 
409  // tell the sender that we received this message OK
410  void send_ok_message(int node, MsgNum_t mnum);
411 
412  // unpack message with a list of OK message numbers, and delete them
413  // from our cache
414  void clear_ok_messages(int nitems, void *pos);
415 
416  // remove a single OK message
418 
419  // process list of resend requests
421 };
422 
423 #endif // COMMUNICATE_H
void clear_ok_messages(int nitems, void *pos)
std::ostream & operator<<(std::ostream &os, const Attribute &attr)
Definition: Attribute.cpp:167
char * buf()
Definition: Communicate.h:88
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
Definition: Communicate.cpp:70
std::vector< MessageData > recMsgList
Definition: Communicate.h:258
virtual ~Communicate(void)
Definition: Communicate.cpp:85
CommSendInfo(int size, char *buf, int node)
Definition: Communicate.h:54
char * buf_m
Definition: Communicate.h:107
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:98
static unsigned int wordround(int size)
Definition: Communicate.h:350
static void unpack(void *&pos, void *packdata, int size)
Definition: Communicate.h:371
int myNode() const
Definition: Communicate.h:155
void fill_msg_buffer(void *, Message &, int, int, int)
int getReceived() const
Definition: Communicate.h:163
virtual MPI_Request raw_isend(void *, int, int, int)
Definition: Communicate.h:196
virtual int raw_probe_receive(char *&, int &, int &)
Definition: Communicate.h:208
int getContexts(const int n) const
Definition: Communicate.h:147
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
void freebuf()
Definition: Communicate.h:97
virtual MPI_Request raw_ireceive(char *, int, int, int)
Definition: Communicate.h:204
std::vector< int > Contexts
Definition: Communicate.h:264
void barrier(void)
const int COMM_ANY_NODE
Definition: Communicate.h:40
int find_msg_length(Message &)
int getError() const
Definition: Communicate.h:159
virtual bool mysend(Message *, int node, int utag, int etag)
virtual bool raw_send(void *, int, int, int)
Definition: Communicate.h:192
virtual void mybarrier(void)
MsgNum_t nextMsgNum
Definition: Communicate.h:270
void request_retransmission(int node, MsgNum_t mnum)
MessageData(int n, int t, Message *m)
Definition: Communicate.h:250
int size() const
Definition: Communicate.h:78
SentCache_t sentMsgCache
Definition: Communicate.h:276
const int COMM_ANY_TAG
Definition: Communicate.h:41
constexpr double c
The velocity of light in m/s.
Definition: Physics.h:52
static void addwordround(void *&pos, int size)
Definition: Communicate.h:358
virtual int broadcast_others(Message *, int, bool delmsg=true)
static void * makebuffer(int size)
Definition: Communicate.h:339
static void freebuffer(void *buf)
Definition: Communicate.h:343
int getProcesses(const int n, const int c) const
Definition: Communicate.h:151
virtual void cleanupMessage(void *)
void process_resend_requests()
Message * find_msg(int &, int &)
MessageData(const MessageData &m)
Definition: Communicate.h:252
static void pack(void *packdata, void *&pos, int size)
Definition: Communicate.h:364
std::map< MsgNum_t, CommSendInfo > SentCache_t
Definition: Communicate.h:275
std::vector< std::pair< int, MsgNum_t > > informOKList
Definition: Communicate.h:285
int node() const
Definition: Communicate.h:83
void unpack_retransmission_request(int nitems, void *pos)
virtual int raw_receive(char *, int, int &, int &)
Definition: Communicate.h:200
std::vector< MsgNum_t > sentOKList
Definition: Communicate.h:282
virtual bool resend(void *buf, int size, int node, int etag)
std::vector< std::pair< int, MsgNum_t > > requestList
Definition: Communicate.h:288
void perform_resend(MsgNum_t mnum)
Message * receive_block(int &node, int &tag)
virtual Message * myreceive(int &node, int &tag, int etag)
CommSendInfo & operator=(const CommSendInfo &c)
Definition: Communicate.h:70
Message * receive(int &node, int &tag)
Message * unpack_message(int &node, int &tag, void *pos)
virtual const char * name() const
Definition: Communicate.h:137
bool send(Message *, int node, int tag, bool delmsg=true)
std::vector< MsgNum_t > resendList
Definition: Communicate.h:279
virtual int broadcast_all(Message *, int)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
int getNodes() const
Definition: Communicate.h:143
CommSendInfo(const CommSendInfo &c)
Definition: Communicate.h:59
const char * buf() const
Definition: Communicate.h:92
void send_ok_message(int node, MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)