OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
CommMPI.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
26 // include files
27 #include "Message/CommMPI.h"
28 #include "Message/Message.h"
29 #include "Utility/IpplInfo.h"
30 #include "Utility/PAssert.h"
31 
32 
34 
35 // include mpi header file
36 #include <mpi.h>
37 
38 #ifdef _OPENMP
39  #include <omp.h>
40 #endif
41 
42 #include <cstring>
43 #include <cstdlib>
44 #include <unistd.h>
45 #include <vector>
46 
47 // if an error occurs during myreceive more times than this, CommMPI
48 // will just exit. Make it negative to totally disable checking for a
49 // maximum number of errors
50 #define MAX_MPI_ERRS 500
51 
52 
53 // static data to keep track of errors
54 static int numErrors = 0;
55 #if defined(IPPL_MPIXX)
56 static MPI_Aint size_of_MPI_INT; /* needed for tracing */
57 #else
58 static int size_of_MPI_INT; /* needed for tracing */
59 #endif
60 
61 // temporary buffer used for speed
62 #define PSIZE 1024*16
63 #define PACKSIZE ((PSIZE)*sizeof(long))
64 static long mpipackbuf[PSIZE];
65 
66 
67 
69 // constructor. arguments: command-line args, and number of processes
70 // to start (if < 0, start the 'default' number, i.e. the number of
71 // hosts in a MPI virtual machine, the number of nodes in an O2K, etc)
72 // Note: The base-class constructor does not need the argument info or
73 // the number of nodes, it just by default sets the number of nodes=1
74 // The final argument indicates whether to run MPI_Init or not; IPPL
75 // may be run as another user of MPI, in a context where MPI_Init has
76 // already been called, in which case it can skip that step.
77 CommMPI::CommMPI(int& argc , char**& argv, int procs, bool mpiinit, MPI_Comm mpicomm)
78  : Communicate(argc, argv, procs), weInitialized(mpiinit)
79 {
80 
81  int i, reported, rep_host, ierror, result_len;
82  MPI_Status stat;
83  char *currtok, *nexttok, *execname;
84 
85  // a little "string magic" to strip the absolute pathname off the executable
86  currtok = strstr(argv[0],"/");
87  if (!currtok)
88  {
89  execname = strdup(argv[0]);
90  }
91  else
92  {
93  currtok++;
94  nexttok = strstr(currtok,"/");
95  while (nexttok)
96  {
97  currtok = nexttok+1;
98  nexttok = strstr(currtok,"/");
99  }
100  execname = strdup(currtok);
101  }
102 
103  // initialize mpi
104  if (weInitialized) {
105 #ifdef _OPENMP
106  int provided = 0;
107  MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
108  INFOMSG("Ippl will be initialized with " <<
109  omp_get_max_threads() << " OMP threads\n");
110 
111  if ( provided != MPI_THREAD_FUNNELED )
112  ERRORMSG("CommMPI: Didn't get requested MPI-OpenMP setting.\n");
113 #else
114  MPI_Init(&argc, &argv);
115 #endif
116  }
117  //else
118  // INFOMSG("NOT initializing MPI = " << endl);
119 
120  // restore original executable name without absolute path
121  strcpy(argv[0],execname);
122 
123  free(execname);
124 
125  // duplicate the MPI_COMM_WORLD communicator, so that we can use
126  // a communicator that will not conflict with other users of MPI_COMM_WORLD
127  MPI_Comm_dup(mpicomm, &communicator);
128 
129  // determine the number of nodes running and my node number
130  MPI_Comm_size(communicator,&TotalNodes);
131  MPI_Comm_rank(communicator,&myHost);
132 
133  // make sure we do not have too many processes running
134  if (procs > 0 && procs < TotalNodes)
135  {
136  // if this is a process that is beyond what we had requested, just exit
137  if (myHost >= procs)
138  Ippl::abort();
139  TotalNodes = procs;
140  }
141 
142  MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
143  if (myHost == 0) // this code is run by the master process
144  {
145  // send a messages to each child node
146  for (i = 1; i < TotalNodes; i++)
147  {
148  MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, communicator);
149  }
150 
151  // wait for the spawned processes to report back that they're ready
152  //~ int *child_ready = new int[TotalNodes];
153  std::vector<int> child_ready(TotalNodes);
154  for (i = 0; i < TotalNodes; child_ready[i++] = 0)
155  ;
156  INFOMSG("CommMPI: Parent process waiting for children ..." << endl);
157  reported = 1; // since the parent is already ready
158  while (reported < TotalNodes)
159  {
160  ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
161  COMM_HOSTS_TAG, communicator, &stat);
162 
163  if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
164  {
165  child_ready[rep_host] = 1;
166  reported++;
167  INFOMSG("CommMPI: Child " << rep_host << " ready." << endl);
168  }
169  else
170  {
171  ERRORMSG("CommMPI: Error with child reporting to parent. ");
172  ERRORMSG("rep_host = " << rep_host);
173  ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
174  }
175  }
176 
177  //~ delete [] child_ready;
178  INFOMSG("CommMPI: Initialization complete." << endl);
179 
180  }
181  else // this is a child process; get data from pops
182  {
183  char host_name[MPI_MAX_PROCESSOR_NAME];
184  ierror = MPI_Get_processor_name(host_name, &result_len);
185  if (ierror >= 0)
186  {
187  INFOMSG("CommMPI: Started job " << myHost << " on host `");
188  INFOMSG(host_name << "'." << endl);
189  }
190  else
191  {
192  ERRORMSG("CommMPI: failed" << endl);
193  }
194 
195  // receive message from the master node
196  int checknode;
197  MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator,
198  &stat);
199 
200  if (checknode != 0)
201  WARNMSG("CommMPI: Child received bad message during startup." << endl);
202 
203  // send back an acknowledgement
204  MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator);
205 
206  }
207 
208  // set up the contexts and processes arrays properly
209  if (TotalNodes > 1)
210  {
211  std::vector<int> proccount;
212  proccount.push_back(1);
213  for (i = 1; i < TotalNodes; i++)
214  {
215  Contexts.push_back(1);
216  Processes.push_back(proccount);
217  }
218  }
219 
220 }
221 
222 
224 // class destructor
226 {
227 
228  int i, dieCode = 0;
229  MPI_Status stat;
230 
231  // on all nodes, when running in parallel, get any extra messages not
232  // yet received
233  if (TotalNodes > 1)
234  {
235  int trial, node, tag;
236  Message *msg;
237  for (trial = 0; trial < 50000; ++trial)
238  {
239  do
240  {
241  node = COMM_ANY_NODE;
242  tag = COMM_ANY_TAG;
243  msg = myreceive(node, tag, COMM_SEND_TAG);
244  if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG)
245  {
246  WARNMSG("CommMPI: Found extra message from node " << node);
247  WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
248  }
249  }
250  while (msg != 0);
251  }
252  }
253 
254  // broadcast a message to all other nodes to tell them to quit
255  if (myNode() == 0)
256  {
257  // on master node, send out messages
258  for (i = 1; i < TotalNodes; i++)
259  {
260  MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, communicator);
261 
262  }
263  }
264  else
265  {
266  // on client nodes, receive message
267  MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, communicator, &stat);
268 
269  }
270 
271  MPI_Barrier(communicator);
272  // delete the communicator we used
273  MPI_Comm_free(&communicator);
274 
275  // if we did our own initialization, also do finalize operation. But
276  // if we did not initialize, skip the finalize as well
277 
278  /*
279  ada: it make no sense to call finalize from the application,
280  because the IPPL destructor is called AFTER that, which causes:
281  "0032-151 MPI is already finalized in string, task number"
282  */
283  if (weInitialized)
284  MPI_Finalize();
285 }
286 
287 
289 // take the data from a Message object and pack it into the current send buf.
290 // each message is packed in this order:
291 // tag, sending node, number of items (3-int array)
292 // type of item 1 (short)
293 // size of item 1, in number of elements (int)
294 // item 1 data (various)
295 // ...
296 // type of item N (short)
297 // size of item N, in number of elements (int)
298 // item N data (various)
299 void *CommMPI::pack_message(Message *msg, int tag, int &buffsize, int node)
300 {
301 
302  // calculate size of buffer
303  buffsize = find_msg_length(*msg);
304 
305  // allocate storage for buffer
306  void *pos = ((unsigned int) buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
307 
308  // pack message data and return the necessary pointer
309  fill_msg_buffer(pos, *msg, tag, buffsize, node);
310  return pos;
311 }
312 
313 
315 // send a message ... arguments are the Message itself, the
316 // destination node, the 'user' tag, and the 'encoding' tag.
317 // Messages should be sent via the underlying mechanism by using the
318 // encoding tag (one of the COMM_ tags),
319 // and should embed the information about what the user
320 // tag is in the data sent between nodes. Return success.
321 bool CommMPI::mysend(Message *msg, int node, int tag, int etag)
322 {
323 
324  int errstat = (-1);
325  int flag = false;
326  MPI_Request request;
327  MPI_Status status;
328 
329  MPI_Status rec_status;
330  int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
331  Message* newmsg = NULL;
332 
333  // pack the message data into the buffer
334  int size;
335  void *outbuffer = pack_message(msg, tag, size, node);
336 
337  // send the message (non-blocking)
338  // Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
339  // dbgmsg << "Sending MPI message of size " << size << " to node " << node;
340  // dbgmsg << " with tag " << tag << "." << endl;
341 
342  //messaging "profiler"
344 
345  errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
346  communicator, &request);
347 
348 
349  while (!flag)
350  {
351  if (!Ippl::retransmit())
352  {
353  // get info about messages to be received
354  bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, communicator,
355  &rec_flag, &rec_status);
356  if ((bufid >= 0) && (rec_flag != 0) )
357  {
358  // a message is available to be received
359  src_node = rec_status.MPI_SOURCE;
360  rec_tag = rec_status.MPI_TAG;
361  MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
362  // dbgmsg<<"Receiving MPI message of size " << rec_size << " from node ";
363  // dbgmsg << src_node << "." << endl;
364  if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) )
365  {
366  // message is a valid one, so malloc the output buffer
367  void *rec_buff = makebuffer(rec_size);
368 
369  // blocking receive, unpack message
370  MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
371  communicator, &rec_status);
372 
373  newmsg = unpack_message(rec_node, rec_utag, rec_buff);
374 
375  // if there was an error unpacking, then the message had a problem
376  // and is invalid, so throw this one away
377  if (newmsg == 0)
378  {
379  // free up the buffer
380  cleanupMessage(rec_buff);
381 
382  }
383  else
384  {
385  // tell the message to inform us when the buffer is finished
386  newmsg->useCommunicate(this, rec_buff);
387 
388  // put message in my message queue
389  add_msg(newmsg, rec_node, rec_utag);
390  }
391 
392  // reset other receive information
393  newmsg = NULL; // reset message pointer
394  rec_flag = 0; // reset receive flag
395  }
396  }
397  }
398 
399  // check for completion of send
400  MPI_Test(&request, &flag, &status);
401  }
402 
403  // free up the send buffer
404  if ((unsigned int) size > PACKSIZE)
405  freebuffer(outbuffer);
406 
407  // return the success of the operation
408  return (errstat == 0);
409 }
410 
411 
413 // receive a message from the given node and user tag. Return a NEW
414 // Message object if a message arrives, or NULL if no message available.
415 // node will be set to the node from which the message was sent.
416 // tag will be set to the 'user tag' for that message.
417 // etag is the 'encoding' tag, and must be one of the COMM_ tags.
418 // Only message sent via the underlying mechanism with the
419 // given etag are checked. When one is found, the user tag and sending
420 // node are extracted from the sent data.
421 // If node = COMM_ANY_NODE, checks for messages from any node.
422 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
423 Message *CommMPI::myreceive(int& node, int& tag, int etag)
424 {
425 
426  int bufid, size, checknode, checktag, flag = false;
427  Message *newmsg = 0;
428  MPI_Status stat;
429 
430  checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
431  checktag = etag;
432 
433  // get info about message
434  bufid = MPI_Iprobe(checknode, checktag, communicator, &flag, &stat);
435  if (bufid < 0)
436  {
437  // an error has occurred
438  ERRORMSG("CommMPI: cannot receive msg from node " << checknode);
439  ERRORMSG(", tag " << checktag << endl);
440 
441  if (MAX_MPI_ERRS > 0 && ++numErrors > MAX_MPI_ERRS)
442  {
443  ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
444  ERRORMSG(") exceeded. MPI is hosed!!" << endl);
445  Ippl::abort();
446  }
447  }
448 
449  // if the message is actually available, see if we can get it now
450  if (flag == true)
451  {
452  MPI_Get_count(&stat,MPI_BYTE,&size);
453  if (size < 0)
454  {
455  ERRORMSG("CommMPI: received message has size " << size << endl);
456  }
457  else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0))
458  {
459  ERRORMSG("CommMPI: received message with invalid tag ");
460  ERRORMSG(stat.MPI_TAG << endl);
461  }
462  else if (stat.MPI_SOURCE < 0)
463  {
464  ERRORMSG("CommMPI: received message from invalid source ");
465  ERRORMSG(stat.MPI_SOURCE << endl);
466  }
467  else
468  {
469  checknode = stat.MPI_SOURCE;
470  checktag = stat.MPI_TAG;
471 
472  // malloc the receive buffer
473  void *outbuff = makebuffer(size);
474 
475  // blocking receive
476  // Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
477  // dbgmsg << "Receiving MPI message of size " << size << " from node ";
478  // dbgmsg << checknode << "." << endl;
479  MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
480  communicator, &stat);
481 
482  newmsg = unpack_message(node, tag, outbuff);
483 
484  // if there was an error unpacking, then the message had a problem
485  // and is invalid, so throw this one away
486  if (newmsg == 0)
487  {
488  // free up the buffer
489  cleanupMessage(outbuff);
490  }
491  else
492  {
493  // tell the message to notify us when its done with the buffer
494  newmsg->useCommunicate(this, outbuff);
495  }
496 
497  // zero out the count of MPI-specific errors
498  numErrors = 0;
499  }
500 
501  }
502  else
503  {
504  // no message is available
505  DEBUGMSG(level2<<"CommMPI: No Message Received to Match Request"<<endl);
506  }
507 
508  // return the new Message, or NULL if no message available
509  return newmsg;
510 }
511 
512 
514 // Synchronize all processors (everybody waits for everybody
515 // else to get here before returning to calling function).
516 // Uses MPI barrier for all procs
518 {
519  MPI_Barrier(communicator);
520 }
521 
522 
524 // resent a message buffer that has been previously packed and copied
525 // into the provided buffer. Return success.
526 bool CommMPI::resend(void *buf, int buffsize, int node, int etag)
527 {
528 
529  //Inform dbgmsg("CommMPI::resend", INFORM_ALL_NODES);
530  //dbgmsg << "About to resend buffer of size " << buffsize << " to node ";
531  //dbgmsg << node << " with etag = " << etag << endl;
532 
533  // this will only work if we're sending to another node
534  PInsist(node != myNode(), "Can only retransmit to other nodes");
535 
537 
538  // send the buffer out
539  MPI_Request request;
540  int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
541  communicator, &request);
542 
543 
544  int flag = false;
545  MPI_Status status;
546  while (!flag)
547  {
548  // check for completion of send
549  MPI_Test(&request, &flag, &status);
550  }
551 
552  // return the success of the operation
553  return (errstat == 0);
554 }
555 
556 
558 // clean up after a Message has been used (called by Message).
560 {
561  // need to free the allocated storage
562  freebuffer(d);
563 }
564 
565 
566 bool CommMPI::raw_send(void *data, int size, int node, int tag)
567 {
569 
570  return MPI_Send(data, size, MPI_BYTE, node, tag, communicator)
571  == MPI_SUCCESS;
572 }
573 
574 MPI_Request CommMPI::raw_isend(void *data, int size, int node, int tag)
575 {
576  MPI_Request request;
577 
579 
580  MPI_Isend(data, size, MPI_BYTE, node, tag, communicator, &request);
581  return request;
582 }
583 
584 int CommMPI::raw_receive(char *data, int size, int &node, int &tag)
585 {
586  if (node == COMM_ANY_NODE)
587  node = MPI_ANY_SOURCE;
588  if (tag == COMM_ANY_TAG)
589  tag = MPI_ANY_TAG;
590 
591  MPI_Status stat;
592  MPI_Recv(data, size, MPI_BYTE, node, tag, communicator, &stat);
593 
594  node = stat.MPI_SOURCE;
595  tag = stat.MPI_TAG;
596  int count;
597  MPI_Get_count(&stat, MPI_BYTE, &count);
598  return count;
599 }
600 
601 MPI_Request CommMPI::raw_ireceive(char *buf, int size, int node, int tag)
602 {
603  if (node == COMM_ANY_NODE)
604  node = MPI_ANY_SOURCE;
605  if (tag == COMM_ANY_TAG)
606  tag = MPI_ANY_TAG;
607 
608  MPI_Request request;
609  MPI_Irecv(buf, size, MPI_BYTE, node, tag, communicator, &request);
610 
611  return request;
612 }
613 
614 int CommMPI::raw_probe_receive(char *&data, int &node, int &tag)
615 {
616  if (node == COMM_ANY_NODE)
617  node = MPI_ANY_SOURCE;
618  if (tag == COMM_ANY_TAG)
619  tag = MPI_ANY_TAG;
620  MPI_Status stat;
621 
622  MPI_Probe(node, tag, communicator, &stat);
623  int count;
624  MPI_Get_count(&stat, MPI_BYTE, &count);
625  if(count>0)
626  data = new char[count];
627  else
628  data = 0;
629  node = stat.MPI_SOURCE;
630  tag = stat.MPI_TAG;
631 
632  MPI_Recv(data, count, MPI_BYTE, node, tag, communicator, &stat);
633 
634  return count;
635 }
virtual MPI_Request raw_ireceive(char *buf, int size, int node, int tag)
Definition: CommMPI.cpp:601
virtual void cleanupMessage(void *)
Definition: CommMPI.cpp:559
static void abort(const char *=0, int exitcode=(-1))
Definition: IpplInfo.cpp:696
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:98
#define PACKSIZE
Definition: CommMPI.cpp:63
int myNode() const
Definition: Communicate.h:155
void fill_msg_buffer(void *, Message &, int, int, int)
bool weInitialized
Definition: CommMPI.h:97
static bool retransmit()
Definition: IpplInfo.h:221
#define ERRORMSG(msg)
Definition: IpplInfo.h:399
virtual int raw_receive(char *, int size, int &node, int &tag)
Definition: CommMPI.cpp:584
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
std::vector< int > Contexts
Definition: Communicate.h:264
int find_msg_length(Message &)
Inform & level2(Inform &inf)
Definition: Inform.cpp:46
#define MAX_MPI_ERRS
Definition: CommMPI.cpp:50
static IpplMessageCounter & getInstance()
#define IPPL_ABORT_TAG
Definition: Tags.h:25
virtual int raw_probe_receive(char *&, int &node, int &tag)
Definition: CommMPI.cpp:614
#define INFOMSG(msg)
Definition: IpplInfo.h:397
static void * makebuffer(int size)
Definition: Communicate.h:339
void * pack_message(Message *msg, int tag, int &buffsize, int node)
Definition: CommMPI.cpp:299
virtual bool mysend(Message *, int node, int utag, int etag)
Definition: CommMPI.cpp:321
#define WARNMSG(msg)
Definition: IpplInfo.h:398
void useCommunicate(Communicate *c, void *d)
Definition: Message.h:366
static void freebuffer(void *buf)
Definition: Communicate.h:343
virtual void mybarrier(void)
Definition: CommMPI.cpp:517
#define PSIZE
Definition: CommMPI.cpp:62
virtual Message * myreceive(int &node, int &tag, int etag)
Definition: CommMPI.cpp:423
virtual ~CommMPI(void)
Definition: CommMPI.cpp:225
virtual MPI_Request raw_isend(void *, int size, int node, int tag)
Definition: CommMPI.cpp:574
#define PInsist(c, m)
Definition: PAssert.h:135
Message * unpack_message(int &node, int &tag, void *pos)
#define DEBUGMSG(msg)
Definition: IpplInfo.h:405
#define IPPL_EXIT_TAG
Definition: Tags.h:26
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
virtual bool raw_send(void *, int size, int node, int tag)
Definition: CommMPI.cpp:566
virtual bool resend(void *buf, int size, int node, int etag)
Definition: CommMPI.cpp:526
MPI_Comm communicator
Definition: CommMPI.h:94
CommMPI(int &argc, char **&argv, int procs=(-1), bool mpiinit=true, MPI_Comm mpicomm=MPI_COMM_WORLD)
Definition: CommMPI.cpp:77