OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
Communicate.h
Go to the documentation of this file.
1// -*- C++ -*-
2/***************************************************************************
3 *
4 * The IPPL Framework
5 *
6 *
7 * Visit http://people.web.psi.ch/adelmann/ for more details
8 *
9 ***************************************************************************/
10
11#ifndef COMMUNICATE_H
12#define COMMUNICATE_H
13
14/***************************************************************************
15 * Communicate.h - communications object for use with Ippl framework. Allows
16 * user to establish id's for available nodes, establish connections, and
17 * send/receive data.
18 ***************************************************************************/
19
20// include files
21#include "Message/TagMaker.h"
22#include "Message/Tags.h"
23#include <cstdlib>
24#include <memory>
25#include <cstring>
26
27#include <vector>
28#include <utility>
29
30#include <iostream>
31
32#include <mpi.h>
33
34// forward declarations
35class Message;
36class Communicate;
37std::ostream& operator<<(std::ostream&, const Communicate&);
38
39// special codes used as 'wildcards' to match any node or tag
40const int COMM_ANY_NODE = (-1);
41const int COMM_ANY_TAG = (-1);
42
43
44// A simple class used to store information for caching sent messages. This
45// is only used if the 'retransmit' option is active.
47{
48public:
50 : size_m(0), buf_m(0)
51 {
52 }
53
54 CommSendInfo(int size, char *buf, int node)
56 {
57 }
58
61 {
62 }
63
65 {
66 // the user is actually responsible for freeing the buffer. We
67 // do not do this automatically here
68 }
69
71 {
72 size_m = c.size_m;
73 buf_m = c.buf_m;
74 node_m = c.node_m;
75 return *this;
76 }
77
78 int size() const
79 {
80 return size_m;
81 }
82
83 int node() const
84 {
85 return node_m;
86 }
87
88 char *buf()
89 {
90 return buf_m;
91 }
92 const char *buf() const
93 {
94 return buf_m;
95 }
96
97 void freebuf()
98 {
99 if (buf_m != 0)
100 delete [] buf_m;
101 buf_m = 0;
102 }
103
104private:
107 char *buf_m;
108};
109
110
111// The base class for all specific Communicate objects
112class Communicate : public TagMaker
113{
114
115public:
116 // default error codes, may be overridden by inherited classes.
118
119 // special tags used by this class ... 32000 is arbitrary
121
122 // special codes used as 'wildcards' to match any node or tag
123 // These are listed again because they should be here, but the global
124 // values are kept for compatibility.
125 enum CommCodes { COMM_ANY_NODE = (-1), COMM_ANY_TAG = (-1) };
126
127public:
128
129 // constructor and destructor
130 // constructor arguments: command-line args, and number of processes
131 // to start (if < 0, start the 'default' number, i.e. the number of
132 // hosts in a PVM virtual machine, the number of nodes in an O2K, etc)
133 Communicate(int argc = 0, char** argv = NULL, int procs = (-1));
134 virtual ~Communicate(void);
135
136 // return the name of this item
137 virtual const char *name() const
138 {
139 return "Serial";
140 }
141
142 // return info about connections in general
143 int getNodes() const
144 {
145 return TotalNodes;
146 }
147 int getContexts(const int n) const
148 {
149 return Contexts[n];
150 }
151 int getProcesses(const int n, const int c) const
152 {
153 return Processes[n][c];
154 }
155 int myNode() const
156 {
157 return myHost;
158 }
159 int getError() const
160 {
161 return ErrorStatus;
162 }
163 int getReceived() const
164 {
165 return recMsgList.size();
166 }
167
168 //
169 // nonvirtual routines to send/receive data
170 //
171
172 // send data to another node. Returns success (T or F).
173 // last argument specifies whether to delete the Message after sending
174 // (if message is for another node). Note that if the send is not
175 // successful, the message will NOT be deleted, regardless of delmsg.
176 bool send(Message *, int node, int tag, bool delmsg = true);
177
178 // receive data from another node. Returns newly created Message object
179 // with received message, or NULL if no message is available.
180 // If node is < 0, this will receive the next message with the given tag
181 // from any node. If tag < 0, this will receive the next message with
182 // any tag from the given node. If both are < 0, this will receive the
183 // next message, period. node and tag are passed by reference; if either
184 // is < 0, and a message is received, they are changed to their actual
185 // values.
186 Message *receive(int& node, int& tag);
187
188 // a blocking version of receive;
189 Message *receive_block(int& node, int& tag);
190
191 //send and receive for raw data
192 virtual bool raw_send(void *, int , int , int )
193 {
194 return false;
195 }
196 virtual MPI_Request raw_isend(void *, int , int , int )
197 {
198 return MPI_Request();
199 }
200 virtual int raw_receive(char *, int , int &, int &)
201 {
202 return 0;
203 }
204 virtual MPI_Request raw_ireceive(char *, int , int , int )
205 {
206 return MPI_Request();
207 }
208 virtual int raw_probe_receive(char *&, int &, int &)
209 {
210 return 0;
211 }
212
213
214 //
215 // virtual routines to broadcast data
216 //
217
218 // broadcast the current message to other nodes.
219 // Return number of nodes actually sent to.
220 // The first version sends to all nodes including this node.
221 // The second version sends to all nodes except this node.
222 // The first argument is the Message; the last argument is the tag.
223 virtual int broadcast_all(Message *, int);
224 virtual int broadcast_others(Message *, int, bool delmsg=true);
225
226
227 //
228 // routines to synchronize processors at a barrier
229 //
230
231 // Synchronize all processors (everybody waits for everybody
232 // else to get here before returning to calling function).
233 void barrier(void);
234
235 //
236 // virtual routines to deal with memory management
237 //
238
239 // clean up after a Message has been used (called by Message). By
240 // default, does nothing.
241 virtual void cleanupMessage(void *);
242
243protected:
244 // struct used to store messages, tags, and nodes
246 {
247 int node; // sending/receiving node
248 int tag; // tag of the message
249 Message *msg; // pointer to the message itself
250 MessageData(int n, int t, Message *m) : node(n),tag(t),msg(m) {}
251 MessageData() : node(-1), tag(-1), msg(0) { }
252 MessageData(const MessageData& m) : node(m.node),tag(m.tag),msg(m.msg) {}
254 };
255
256 // a list of messages which have already been received, but not yet
257 // delivered
258 std::vector<MessageData> recMsgList;
259
260 // the following items should be filled in by the derived classes
261 int TotalNodes; // number of nodes available (0 ... # nodes-1)
262 int myHost; // which node am I?
263 int ErrorStatus; // error code, from above enumeration
264 std::vector<int> Contexts; // the number of contexts per node
265 std::vector< std::vector<int> > Processes; // number of running processes per context
266
267 // An integer message number identifier; this is included in each
268 // message, and continually increases as more messages are sent.
269 typedef long MsgNum_t;
271
272 // An optional sent-message cache, used to attempt to retransmit
273 // messages if they are corrupted in-transit. Messages are keyed on
274 // a message number, which is is unique for each message.
275 typedef std::map<MsgNum_t, CommSendInfo> SentCache_t;
277
278 // a list of things to resend at the next opportunity
279 std::vector<MsgNum_t> resendList;
280
281 // a list of messages which have been received OK
282 std::vector<MsgNum_t> sentOKList;
283
284 // a list of messages which should be cleared out on other nodes
285 std::vector<std::pair<int,MsgNum_t> > informOKList;
286
287 // a list of requests we must make to other nodes to resend messages
288 std::vector<std::pair<int,MsgNum_t> > requestList;
289
290 // add a new message to the received message queues. Return success.
291 // arguments: message, sending node, tag
292 bool add_msg(Message *, int, int);
293
294 // Looks for a message in the message queue from the specified node
295 // and tag. This understands wildcards for node and tag.
296 // Returns a pointer to the Message object found, and sets node and
297 // tag equal to the proper values. Also, this will remove the item from
298 // the queue.
299 Message* find_msg(int&, int&);
300
301 //
302 // implementation-specific routines (which begin with 'my')
303 // these should be provided in a derived class, and contain the
304 // comm-library-specific code
305 //
306
307 // send a message ... arguments are the Message itself, the
308 // destination node, the 'user' tag, and the 'encoding' tag.
309 // Messages should be sent via the underlying mechanism by using the
310 // encoding tag (one of the COMM_ tags),
311 // and should embed the information about what the user
312 // tag is in the data sent between nodes. Return success.
313 virtual bool mysend(Message *, int node, int utag, int etag);
314
315 // receive a message from the given node and user tag. Return a NEW
316 // Message object if a message arrives, or NULL if no message available.
317 // node will be set to the node from which the message was sent.
318 // tag will be set to the 'user tag' for that message.
319 // etag is the 'encoding' tag, and must be one of the COMM_ tags.
320 // Only message sent via the underlying mechanism with the
321 // given etag are checked. When one is found, the user tag and sending
322 // node are extracted from the sent data.
323 // If node = COMM_ANY_NODE, checks for messages from any node.
324 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
325 virtual Message *myreceive(int& node, int& tag, int etag);
326
327 // Synchronize all processors (everybody waits for everybody
328 // else to get here before returning to calling function).
329 virtual void mybarrier(void);
330
331 // resent a message buffer that has been previously packed and copied
332 // into the provided buffer. Return success.
333 virtual bool resend(void *buf, int size, int node, int etag);
334
335 //
336 // utility functions used to serialize data into and out of byte buffers
337 //
338 // standard way to create and free buffer storage
339 static inline void *makebuffer(int size)
340 {
341 return malloc(size);
342 }
343 static inline void freebuffer(void *buf)
344 {
345 free(buf);
346 }
347
348 // compute the size of storage needed to add 'size' bytes to a buffer,
349 // in order to keep everything word-aligned
350 static inline unsigned int wordround(int size)
351 {
352 return sizeof(long long) *
353 ((size + sizeof(long long) - 1)/sizeof(long long));
354 }
355
356 // compute a wordround value for 'size' bytes, then add that to the
357 // given 'pos' pointer
358 static inline void addwordround(void * &pos, int size)
359 {
360 pos = static_cast<void *>(wordround(size) + static_cast<char *>(pos));
361 }
362
363 // memcpy data into the given location, and then increment the pointer
364 static inline void pack(void *packdata, void * &pos, int size)
365 {
366 memcpy(pos, packdata, size);
367 addwordround(pos, size);
368 }
369
370 // memcpy data out of a given location to another, updating 'pos'
371 static inline void unpack(void * &pos, void *packdata, int size)
372 {
373 memcpy(packdata, pos, size);
374 addwordround(pos, size);
375 }
376
377 //
378 // utility functions used in packing and unpacking Message data
379 //
380
381 // calculate how big the buffer must be to send the given message
383
384 // put data from the given Message into the given buffer, with tag value.
385 // the final arguments are the buffer size, in bytes, and the dest node.
386 void fill_msg_buffer(void *, Message &, int, int, int);
387
388 // take data out of the current receive buf and create a new Message
389 Message *unpack_message(int &node, int &tag, void *pos);
390
391 //
392 // utility functions used for message caching/retransmit
393 //
394
395 // put the given message buffer in the sent-message cache, as a new
396 // CommSendInfo object storing the buffer and other information.
397 void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node);
398
399 // send off a request to have this message retransmitted to us
400 void request_retransmission(int node, MsgNum_t mnum);
401
402 // resend the data for message mnum ... calls the virtual 'resend'
403 void perform_resend(MsgNum_t mnum);
404
405 // get the resend information from a buffer sent in a message requesting
406 // retransmission
407 void unpack_retransmission_request(int nitems, void *pos);
408
409 // tell the sender that we received this message OK
410 void send_ok_message(int node, MsgNum_t mnum);
411
412 // unpack message with a list of OK message numbers, and delete them
413 // from our cache
414 void clear_ok_messages(int nitems, void *pos);
415
416 // remove a single OK message
418
419 // process list of resend requests
421};
422
423#endif // COMMUNICATE_H
const int COMM_ANY_NODE
Definition: Communicate.h:40
std::ostream & operator<<(std::ostream &, const Communicate &)
Definition: Communicate.cpp:47
const int COMM_ANY_TAG
Definition: Communicate.h:41
constexpr double c
The velocity of light in m/s.
Definition: Physics.h:45
CommSendInfo & operator=(const CommSendInfo &c)
Definition: Communicate.h:70
char * buf()
Definition: Communicate.h:88
int node() const
Definition: Communicate.h:83
void freebuf()
Definition: Communicate.h:97
const char * buf() const
Definition: Communicate.h:92
CommSendInfo(const CommSendInfo &c)
Definition: Communicate.h:59
CommSendInfo(int size, char *buf, int node)
Definition: Communicate.h:54
char * buf_m
Definition: Communicate.h:107
int size() const
Definition: Communicate.h:78
virtual MPI_Request raw_isend(void *, int, int, int)
Definition: Communicate.h:196
virtual bool mysend(Message *, int node, int utag, int etag)
std::vector< int > Contexts
Definition: Communicate.h:264
std::map< MsgNum_t, CommSendInfo > SentCache_t
Definition: Communicate.h:275
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
static void * makebuffer(int size)
Definition: Communicate.h:339
int getProcesses(const int n, const int c) const
Definition: Communicate.h:151
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
virtual MPI_Request raw_ireceive(char *, int, int, int)
Definition: Communicate.h:204
std::vector< MessageData > recMsgList
Definition: Communicate.h:258
static void freebuffer(void *buf)
Definition: Communicate.h:343
static void addwordround(void *&pos, int size)
Definition: Communicate.h:358
Message * receive(int &node, int &tag)
int getReceived() const
Definition: Communicate.h:163
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
Definition: Communicate.h:279
std::vector< std::pair< int, MsgNum_t > > informOKList
Definition: Communicate.h:285
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
virtual int raw_probe_receive(char *&, int &, int &)
Definition: Communicate.h:208
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
int getContexts(const int n) const
Definition: Communicate.h:147
std::vector< std::pair< int, MsgNum_t > > requestList
Definition: Communicate.h:288
static unsigned int wordround(int size)
Definition: Communicate.h:350
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
Definition: Communicate.cpp:69
MsgNum_t nextMsgNum
Definition: Communicate.h:270
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
virtual const char * name() const
Definition: Communicate.h:137
void barrier(void)
virtual void mybarrier(void)
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
static void unpack(void *&pos, void *packdata, int size)
Definition: Communicate.h:371
int getNodes() const
Definition: Communicate.h:143
int getError() const
Definition: Communicate.h:159
virtual ~Communicate(void)
Definition: Communicate.cpp:84
virtual int broadcast_all(Message *, int)
virtual bool raw_send(void *, int, int, int)
Definition: Communicate.h:192
std::vector< MsgNum_t > sentOKList
Definition: Communicate.h:282
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:97
static void pack(void *packdata, void *&pos, int size)
Definition: Communicate.h:364
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
int myNode() const
Definition: Communicate.h:155
SentCache_t sentMsgCache
Definition: Communicate.h:276
virtual int raw_receive(char *, int, int &, int &)
Definition: Communicate.h:200
void fill_msg_buffer(void *, Message &, int, int, int)
MessageData(int n, int t, Message *m)
Definition: Communicate.h:250
MessageData(const MessageData &m)
Definition: Communicate.h:252