OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
CommSHMEMPI.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
26 // include files
27 #include "Message/CommSHMEMPI.h"
28 #include "Message/Message.h"
29 #include "Utility/IpplInfo.h"
30 
31 
32 // include mpi header file.
33 #include <mpi.h>
34 
35 #include <cstring>
36 #include <cstdlib>
37 #include <unistd.h>
38 
39 
40 // if an error occurs during myreceive more times than this, CommSHMEMPI
41 // will just exit. Make it negative to totally disable checking for a
42 // maximum number of errors
43 #define MAX_SHMEMPI_ERRS 500
44 
45 
46 // static data to keep track of errors
47 static int numErrors = 0;
48 static int size_of_SHMEMPI_INT; /* needed for tracing */
49 
50 // temporary buffer used for speed
51 #define PSIZE 1024*16
52 #define PACKSIZE ((PSIZE)*sizeof(long))
53 static long shmempipackbuf[PSIZE];
54 
55 
56 
58 // constructor. arguments: command-line args, and number of processes
59 // to start (if < 0, start the 'default' number ... for MPI, this value
60 // will be ignored, since the number of nodes is determined by the args
61 // to mpirun.
62 CommSHMEMPI::CommSHMEMPI(int& argc , char**& argv, int procs)
63  : Communicate(argc, argv, procs)
64 {
65 
66  int i, reported, rep_host, ierror, result_len;
67  MPI_Status stat;
68  char *currtok, *nexttok, *execname;
69 
70  // a little "string magic" to strip the absolute pathname off the executable
71  currtok = strstr(argv[0],"/");
72  if (!currtok)
73  {
74  execname = strdup(argv[0]);
75  }
76  else
77  {
78  currtok++;
79  nexttok = strstr(currtok,"/");
80  while (nexttok)
81  {
82  currtok = nexttok+1;
83  nexttok = strstr(currtok,"/");
84  }
85  execname = strdup(currtok);
86  }
87 
88  // initialize mpi
89  MPI_Init(&argc, &argv);
90 
91  // restore original executable name without absolute path
92  strcpy(argv[0],execname);
93 
94  // determine the number of nodes running and my node number
95  MPI_Comm_size(MPI_COMM_WORLD,&TotalNodes);
96  MPI_Comm_rank(MPI_COMM_WORLD,&myHost);
97 
98  // make sure we do not have too many processes running
99  if (procs > 0 && procs < TotalNodes)
100  {
101  // if this is a process that is beyond what we had requested, just exit
102  if (myHost >= procs)
103  Ippl::abort();
104  TotalNodes = procs;
105  }
106 
107  MPI_Type_size ( MPI_INT, &size_of_SHMEMPI_INT );
108  if (myHost == 0) // this code is run by the master process
109  {
110  // send a messages to each child node
111  for (i = 1; i < TotalNodes; i++)
112  {
113  MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, MPI_COMM_WORLD);
114 
115  }
116 
117  // wait for the spawned processes to report back that they're ready
118  std::vector<int> child_ready(TotalNodes);
119  for (i = 0; i < TotalNodes; child_ready[i++] = 0);
120  INFOMSG("CommSHMEMPI: Parent process waiting for children ..." << endl);
121  reported = 1; // since the parent is already ready
122  while (reported < TotalNodes)
123  {
124  ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
125  COMM_HOSTS_TAG, MPI_COMM_WORLD, &stat);
126 
127  if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
128  {
129  child_ready[rep_host] = 1;
130  reported++;
131  INFOMSG("CommSHMEMPI: Child " << rep_host << " ready." << endl);
132  }
133  else
134  {
135  ERRORMSG("CommSHMEMPI: Error with child reporting to parent. ");
136  ERRORMSG("rep_host = " << rep_host);
137  ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
138  }
139  }
140 
141  INFOMSG("CommSHMEMPI: Initialization complete." << endl);
142 
143  }
144  else // this is a child process; get data from pops
145  {
146  char host_name[MPI_MAX_PROCESSOR_NAME];
147  ierror = MPI_Get_processor_name(host_name, &result_len);
148  if (ierror >= 0)
149  {
150  INFOMSG("CommSHMEMPI: Started job " << myHost << " on host `");
151  INFOMSG(host_name << "'." << endl);
152  }
153  else
154  {
155  ERRORMSG("CommSHMEMPI: failed" << endl);
156  }
157 
158  // receive message from the master node
159  int checknode;
160  MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD,
161  &stat);
162 
163  if (checknode != 0)
164  WARNMSG("CommSHMEMPI: Child received bad message during startup." << endl);
165 
166  // send back an acknowledgement
167  MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD);
168 
169  }
170 
171  // set up the contexts and processes arrays properly
172  if (TotalNodes > 1)
173  {
174  vector<int> proccount;
175  proccount.push_back(1);
176  for (i = 1; i < TotalNodes; i++)
177  {
178  Contexts.push_back(1);
179  Processes.push_back(proccount);
180  }
181  }
182 }
183 
184 
186 // class destructor
188 {
189 
190  int i, dieCode = 0;
191  MPI_Status stat;
192 
193  // on all nodes, when running in parallel, get any extra messages not
194  // yet received
195  if (TotalNodes > 1)
196  {
197  int trial, node, tag;
198  Message *msg;
199  for (trial = 0; trial < 50000; ++trial)
200  {
201  do
202  {
203  node = COMM_ANY_NODE;
204  tag = COMM_ANY_TAG;
205  msg = myreceive(node, tag, COMM_SEND_TAG);
206  if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG)
207  {
208  WARNMSG("CommSHMEMPI: Found extra message from node " << node);
209  WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
210  }
211  }
212  while (msg != 0);
213  }
214  }
215 
216  // broadcast a message to all other nodes to tell them to quit
217  if (myNode() == 0)
218  {
219  // on master node, send out messages
220  for (i = 1; i < TotalNodes; i++)
221  {
222  MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, MPI_COMM_WORLD);
223 
224  }
225  }
226  else
227  {
228  // on client nodes, receive message
229  MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
230 
231  }
232 
233  MPI_Finalize();
234 }
235 
236 
238 // take the data from a Message object and pack it into the current send buf.
239 // each message is packed in this order:
240 // tag, sending node, number of items (3-int array)
241 // type of item 1 (short)
242 // size of item 1, in number of elements (int)
243 // item 1 data (various)
244 // ...
245 // type of item N (short)
246 // size of item N, in number of elements (int)
247 // item N data (various)
248 void *CommSHMEMPI::pack_message(Message *msg, int tag, int &buffsize)
249 {
250  // calculate size of buffer
251  buffsize = find_msg_length(*msg);
252 
253  // allocate storage for buffer
254  void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : shmempipackbuf;
255 
256  // pack message data and return the necessary pointer
257  fill_msg_buffer(pos, *msg, tag, buffsize);
258  return pos;
259 }
260 
261 
263 // send a message ... arguments are the Message itself, the
264 // destination node, the 'user' tag, and the 'encoding' tag.
265 // Messages should be sent via the underlying mechanism by using the
266 // encoding tag (one of the COMM_ tags),
267 // and should embed the information about what the user
268 // tag is in the data sent between nodes. Return success.
269 bool CommSHMEMPI::mysend(Message *msg, int node, int tag, int etag)
270 {
271 
272  int nitems = msg->size();
273  int errstat = (-1);
274  int flag = false;
275  MPI_Request request;
276  MPI_Status status;
277 
278  MPI_Status rec_status;
279  int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
280  Message* newmsg = NULL;
281 
282  // pack the message data into the buffer
283  int size;
284  void *outbuffer = pack_message(msg, tag, size);
285 
286  // send the message (non-blocking)
287  errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
288  MPI_COMM_WORLD, &request);
289 
290 
291  while (!flag)
292  {
293  // get info about messages to be received
294  bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
295  &rec_flag, &rec_status);
296  if ( (bufid >= 0) && (rec_flag != 0) )
297  {
298  // a message is available to be received
299  src_node = rec_status.MPI_SOURCE;
300  rec_tag = rec_status.MPI_TAG;
301  MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
302  if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) )
303  {
304  // message is a valid one, so malloc the output buffer
305  void *rec_buff = makebuffer(rec_size);
306 
307  // blocking receive, unpack message
308  MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
309  MPI_COMM_WORLD, &rec_status);
310 
311  newmsg = unpack_message(rec_node, rec_utag, rec_buff);
312 
313  // tell this new Message that we were the one that created its
314  // storage buffer, so that when the Messageis deleted, we can
315  // be told about it in order to free the storage.
316  newmsg->useCommunicate(this, rec_buff);
317 
318  // put message in my message queue
319  if (add_msg(newmsg,rec_node,rec_utag))
320  {
321  newmsg = NULL; // reset message pointer
322  rec_flag = 0; // reset receive flag
323  }
324  }
325  }
326 
327  // check for completion of send
328  MPI_Test(&request, &flag, &status);
329  }
330 
331  // free up the send buffer
332  if (size > PACKSIZE)
333  freebuffer(outbuffer);
334 
335  // return the success of the operation
336  return (errstat == 0);
337 }
338 
339 
341 // receive a message from the given node and user tag. Return a NEW
342 // Message object if a message arrives, or NULL if no message available.
343 // node will be set to the node from which the message was sent.
344 // tag will be set to the 'user tag' for that message.
345 // etag is the 'encoding' tag, and must be one of the COMM_ tags.
346 // Only message sent via the underlying mechanism with the
347 // given etag are checked. When one is found, the user tag and sending
348 // node are extracted from the sent data.
349 // If node = COMM_ANY_NODE, checks for messages from any node.
350 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
351 Message *CommSHMEMPI::myreceive(int& node, int& tag, int etag)
352 {
353 
354  int bufid, size, checknode, checktag, flag = false;
355  Message *newmsg = 0;
356  MPI_Status stat;
357 
358  checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
359  checktag = etag;
360 
361  // get info about message
362  bufid = MPI_Iprobe(checknode, checktag, MPI_COMM_WORLD, &flag, &stat);
363  if (bufid < 0)
364  {
365  // an error has occurred
366  ERRORMSG("CommSHMEMPI: cannot receive msg from node " << checknode);
367  ERRORMSG(", tag " << checktag << endl);
368 
369  if (MAX_SHMEMPI_ERRS > 0 && ++numErrors > MAX_SHMEMPI_ERRS)
370  {
371  ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
372  ERRORMSG(") exceeded. MPI is hosed!!" << endl);
373  Ippl::abort();
374  }
375  }
376 
377  // if the message is actually available, see if we can get it now
378  if (flag == true)
379  {
380  MPI_Get_count(&stat,MPI_BYTE,&size);
381  if (size < 0)
382  {
383  ERRORMSG("CommSHMEMPI: received message has size " << size << endl);
384  }
385  else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0))
386  {
387  ERRORMSG("CommSHMEMPI: received message with invalid tag ");
388  ERRORMSG(stat.MPI_TAG << endl);
389  }
390  else if (stat.MPI_SOURCE < 0)
391  {
392  ERRORMSG("CommSHMEMPI: received message from invalid source ");
393  ERRORMSG(stat.MPI_SOURCE << endl);
394  }
395  else
396  {
397  checknode = stat.MPI_SOURCE;
398  checktag = stat.MPI_TAG;
399 
400  // malloc the receive buffer
401  void *outbuff = makebuffer(size);
402 
403  // blocking receive
404  MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
405  MPI_COMM_WORLD, &stat);
406 
407  newmsg = unpack_message(node, tag, outbuff);
408 
409  // tell this new Message that we were the one that created its
410  // storage buffer, so that when the Messageis deleted, we can
411  // be told about it in order to free the storage.
412  newmsg->useCommunicate(this, outbuff);
413  numErrors = 0;
414  }
415  }
416  else
417  {
418  // no message is available
419  DEBUGMSG(level2 << "CommSHMEMPI: No Message Received to Match Request");
420  DEBUGMSG(endl);
421  }
422 
423  // return the new Message, or NULL if no message available
424  return newmsg;
425 }
426 
427 
429 // Synchronize all processors (everybody waits for everybody
430 // else to get here before returning to calling function).
431 // Uses MPI barrier for all procs
433 {
434 
435 
436  MPI_Barrier(MPI_COMM_WORLD);
437 }
438 
439 
441 // clean up after a Message has been used (called by Message).
443 {
444 
445 
446  // need to free the allocated storage
447  freebuffer(d);
448 }
static void abort(const char *=0, int exitcode=(-1))
Definition: IpplInfo.cpp:696
#define PSIZE
Definition: CommSHMEMPI.cpp:51
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:98
int myNode() const
Definition: Communicate.h:155
void fill_msg_buffer(void *, Message &, int, int, int)
#define ERRORMSG(msg)
Definition: IpplInfo.h:399
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
void * pack_message(Message *msg, int tag, int &buffsize)
virtual void cleanupMessage(void *)
std::vector< int > Contexts
Definition: Communicate.h:264
int find_msg_length(Message &)
Inform & level2(Inform &inf)
Definition: Inform.cpp:46
virtual bool mysend(Message *, int node, int utag, int etag)
size_t size() const
Definition: Message.h:300
virtual void mybarrier(void)
#define PACKSIZE
Definition: CommSHMEMPI.cpp:52
#define IPPL_ABORT_TAG
Definition: Tags.h:25
#define INFOMSG(msg)
Definition: IpplInfo.h:397
#define MAX_SHMEMPI_ERRS
Definition: CommSHMEMPI.cpp:43
static void * makebuffer(int size)
Definition: Communicate.h:339
#define WARNMSG(msg)
Definition: IpplInfo.h:398
virtual ~CommSHMEMPI(void)
void useCommunicate(Communicate *c, void *d)
Definition: Message.h:366
static void freebuffer(void *buf)
Definition: Communicate.h:343
CommSHMEMPI(int &argc, char **&argv, int procs=(-1))
Definition: CommSHMEMPI.cpp:62
Message * unpack_message(int &node, int &tag, void *pos)
#define DEBUGMSG(msg)
Definition: IpplInfo.h:405
#define IPPL_EXIT_TAG
Definition: Tags.h:26
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
virtual Message * myreceive(int &node, int &tag, int etag)