OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
CommMPI.cpp
Go to the documentation of this file.
1 //
2 // CommMPI - MPI-specific communications object for use with the
3 // Ippl framework.
4 // Allows user to establish id's for available nodes, establish connections,
5 // and send/receive data.
6 //
7 // Copyright (c) 2008 - 2020, Paul Scherrer Institut, Villigen PSI, Switzerland
8 //
9 // All rights reserved
10 //
11 // This file is part of OPAL.
12 //
13 // OPAL is free software: you can redistribute it and/or modify
14 // it under the terms of the GNU General Public License as published by
15 // the Free Software Foundation, either version 3 of the License, or
16 // (at your option) any later version.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
20 //
21 #include "Message/CommMPI.h"
22 #include "Message/Message.h"
23 #include "Utility/IpplInfo.h"
24 #include "Utility/PAssert.h"
25 
26 
28 
29 // include mpi header file
30 #include <mpi.h>
31 
32 #ifdef _OPENMP
33  #include <omp.h>
34 #endif
35 
36 #include <cstring>
37 #include <cstdlib>
38 #include <unistd.h>
39 #include <vector>
40 
41 // if an error occurs during myreceive more times than this, CommMPI
42 // will just exit. Make it negative to totally disable checking for a
43 // maximum number of errors
44 #define MAX_MPI_ERRS 500
45 
46 
47 // static data to keep track of errors
48 static int numErrors = 0;
49 static int size_of_MPI_INT; /* needed for tracing */
50 
51 // temporary buffer used for speed
52 #define PSIZE 1024*16
53 #define PACKSIZE ((PSIZE)*sizeof(long))
54 static long mpipackbuf[PSIZE];
55 
56 
57 
59 // constructor. arguments: command-line args, and number of processes
60 // to start (if < 0, start the 'default' number, i.e. the number of
61 // hosts in a MPI virtual machine, the number of nodes in an O2K, etc)
62 // Note: The base-class constructor does not need the argument info or
63 // the number of nodes, it just by default sets the number of nodes=1
64 // The final argument indicates whether to run MPI_Init or not; IPPL
65 // may be run as another user of MPI, in a context where MPI_Init has
66 // already been called, in which case it can skip that step.
67 CommMPI::CommMPI(int& argc , char**& argv, int procs, bool mpiinit, MPI_Comm mpicomm)
68  : Communicate(argc, argv, procs), weInitialized(mpiinit)
69 {
70 
71  int i, reported, rep_host, ierror, result_len;
72  MPI_Status stat;
73  char *currtok, *nexttok, *execname;
74 
75  // a little "string magic" to strip the absolute pathname off the executable
76  currtok = strstr(argv[0],"/");
77  if (!currtok)
78  {
79  execname = strdup(argv[0]);
80  }
81  else
82  {
83  currtok++;
84  nexttok = strstr(currtok,"/");
85  while (nexttok)
86  {
87  currtok = nexttok+1;
88  nexttok = strstr(currtok,"/");
89  }
90  execname = strdup(currtok);
91  }
92 
93  // initialize mpi
94  if (weInitialized) {
95 #ifdef _OPENMP
96  int provided = 0;
97  MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
98  INFOMSG("Ippl will be initialized with " <<
99  omp_get_max_threads() << " OMP threads\n");
100 
101  if ( provided != MPI_THREAD_FUNNELED )
102  ERRORMSG("CommMPI: Didn't get requested MPI-OpenMP setting.\n");
103 #else
104  MPI_Init(&argc, &argv);
105 #endif
106  }
107  //else
108  // INFOMSG("NOT initializing MPI = " << endl);
109 
110  // restore original executable name without absolute path
111  strcpy(argv[0],execname);
112 
113  free(execname);
114 
115  // duplicate the MPI_COMM_WORLD communicator, so that we can use
116  // a communicator that will not conflict with other users of MPI_COMM_WORLD
117  MPI_Comm_dup(mpicomm, &communicator);
118 
119  // determine the number of nodes running and my node number
120  MPI_Comm_size(communicator,&TotalNodes);
121  MPI_Comm_rank(communicator,&myHost);
122 
123  // make sure we do not have too many processes running
124  if (procs > 0 && procs < TotalNodes)
125  {
126  // if this is a process that is beyond what we had requested, just exit
127  if (myHost >= procs)
128  Ippl::abort();
129  TotalNodes = procs;
130  }
131 
132  MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
133  if (myHost == 0) // this code is run by the master process
134  {
135  // send a messages to each child node
136  for (i = 1; i < TotalNodes; i++)
137  {
138  MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, communicator);
139  }
140 
141  // wait for the spawned processes to report back that they're ready
142  //~ int *child_ready = new int[TotalNodes];
143  std::vector<int> child_ready(TotalNodes);
144  for (i = 0; i < TotalNodes; child_ready[i++] = 0)
145  ;
146  INFOMSG(level5 << "CommMPI: Parent process waiting for children ..." << endl);
147  reported = 1; // since the parent is already ready
148  while (reported < TotalNodes)
149  {
150  ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
151  COMM_HOSTS_TAG, communicator, &stat);
152 
153  if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
154  {
155  child_ready[rep_host] = 1;
156  reported++;
157  INFOMSG(level5 << "CommMPI: Child " << rep_host << " ready." << endl);
158  }
159  else
160  {
161  ERRORMSG("CommMPI: Error with child reporting to parent. ");
162  ERRORMSG("rep_host = " << rep_host);
163  ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
164  }
165  }
166 
167  //~ delete [] child_ready;
168  INFOMSG(level5 << "CommMPI: Initialization complete." << endl);
169 
170  }
171  else // this is a child process; get data from pops
172  {
173  char host_name[MPI_MAX_PROCESSOR_NAME];
174  ierror = MPI_Get_processor_name(host_name, &result_len);
175  if (ierror >= 0)
176  {
177  INFOMSG(level5 << "CommMPI: Started job " << myHost << " on host `");
178  INFOMSG(host_name << "'." << endl);
179  }
180  else
181  {
182  ERRORMSG("CommMPI: failed" << endl);
183  }
184 
185  // receive message from the master node
186  int checknode;
187  MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator,
188  &stat);
189 
190  if (checknode != 0)
191  WARNMSG("CommMPI: Child received bad message during startup." << endl);
192 
193  // send back an acknowledgement
194  MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator);
195 
196  }
197 
198  // set up the contexts and processes arrays properly
199  if (TotalNodes > 1)
200  {
201  std::vector<int> proccount;
202  proccount.push_back(1);
203  for (i = 1; i < TotalNodes; i++)
204  {
205  Contexts.push_back(1);
206  Processes.push_back(proccount);
207  }
208  }
209 
210 }
211 
212 
214 // class destructor
216 {
217 
218  int i, dieCode = 0;
219  MPI_Status stat;
220 
221  // on all nodes, when running in parallel, get any extra messages not
222  // yet received
223  if (TotalNodes > 1)
224  {
225  int trial, node, tag;
226  Message *msg;
227  for (trial = 0; trial < 50000; ++trial)
228  {
229  do
230  {
231  node = COMM_ANY_NODE;
232  tag = COMM_ANY_TAG;
233  msg = myreceive(node, tag, COMM_SEND_TAG);
234  if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG)
235  {
236  WARNMSG("CommMPI: Found extra message from node " << node);
237  WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
238  }
239  }
240  while (msg != 0);
241  }
242  }
243 
244  // broadcast a message to all other nodes to tell them to quit
245  if (myNode() == 0)
246  {
247  // on master node, send out messages
248  for (i = 1; i < TotalNodes; i++)
249  {
250  MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, communicator);
251 
252  }
253  }
254  else
255  {
256  // on client nodes, receive message
257  MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, communicator, &stat);
258 
259  }
260 
261  MPI_Barrier(communicator);
262  // delete the communicator we used
263  MPI_Comm_free(&communicator);
264 
265  // if we did our own initialization, also do finalize operation. But
266  // if we did not initialize, skip the finalize as well
267 
268  /*
269  ada: it make no sense to call finalize from the application,
270  because the IPPL destructor is called AFTER that, which causes:
271  "0032-151 MPI is already finalized in string, task number"
272  */
273  if (weInitialized)
274  MPI_Finalize();
275 }
276 
277 
279 // take the data from a Message object and pack it into the current send buf.
280 // each message is packed in this order:
281 // tag, sending node, number of items (3-int array)
282 // type of item 1 (short)
283 // size of item 1, in number of elements (int)
284 // item 1 data (various)
285 // ...
286 // type of item N (short)
287 // size of item N, in number of elements (int)
288 // item N data (various)
289 void *CommMPI::pack_message(Message *msg, int tag, int &buffsize, int node)
290 {
291 
292  // calculate size of buffer
293  buffsize = find_msg_length(*msg);
294 
295  // allocate storage for buffer
296  void *pos = ((unsigned int) buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
297 
298  // pack message data and return the necessary pointer
299  fill_msg_buffer(pos, *msg, tag, buffsize, node);
300  return pos;
301 }
302 
303 
305 // send a message ... arguments are the Message itself, the
306 // destination node, the 'user' tag, and the 'encoding' tag.
307 // Messages should be sent via the underlying mechanism by using the
308 // encoding tag (one of the COMM_ tags),
309 // and should embed the information about what the user
310 // tag is in the data sent between nodes. Return success.
311 bool CommMPI::mysend(Message *msg, int node, int tag, int etag)
312 {
313 
314  int errstat = (-1);
315  int flag = false;
316  MPI_Request request;
317  MPI_Status status;
318 
319  MPI_Status rec_status;
320  int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
321  Message* newmsg = NULL;
322 
323  // pack the message data into the buffer
324  int size;
325  void *outbuffer = pack_message(msg, tag, size, node);
326 
327  // send the message (non-blocking)
328  // Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
329  // dbgmsg << "Sending MPI message of size " << size << " to node " << node;
330  // dbgmsg << " with tag " << tag << "." << endl;
331 
332  //messaging "profiler"
334 
335  errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
336  communicator, &request);
337 
338 
339  while (!flag)
340  {
341  if (!Ippl::retransmit())
342  {
343  // get info about messages to be received
344  bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, communicator,
345  &rec_flag, &rec_status);
346  if ((bufid >= 0) && (rec_flag != 0) )
347  {
348  // a message is available to be received
349  src_node = rec_status.MPI_SOURCE;
350  rec_tag = rec_status.MPI_TAG;
351  MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
352  // dbgmsg<<"Receiving MPI message of size " << rec_size << " from node ";
353  // dbgmsg << src_node << "." << endl;
354  if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) )
355  {
356  // message is a valid one, so malloc the output buffer
357  void *rec_buff = makebuffer(rec_size);
358 
359  // blocking receive, unpack message
360  MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
361  communicator, &rec_status);
362 
363  newmsg = unpack_message(rec_node, rec_utag, rec_buff);
364 
365  // if there was an error unpacking, then the message had a problem
366  // and is invalid, so throw this one away
367  if (newmsg == 0)
368  {
369  // free up the buffer
370  cleanupMessage(rec_buff);
371 
372  }
373  else
374  {
375  // tell the message to inform us when the buffer is finished
376  newmsg->useCommunicate(this, rec_buff);
377 
378  // put message in my message queue
379  add_msg(newmsg, rec_node, rec_utag);
380  }
381 
382  // reset other receive information
383  newmsg = NULL; // reset message pointer
384  rec_flag = 0; // reset receive flag
385  }
386  }
387  }
388 
389  // check for completion of send
390  MPI_Test(&request, &flag, &status);
391  }
392 
393  // free up the send buffer
394  if ((unsigned int) size > PACKSIZE)
395  freebuffer(outbuffer);
396 
397  // return the success of the operation
398  return (errstat == 0);
399 }
400 
401 
403 // receive a message from the given node and user tag. Return a NEW
404 // Message object if a message arrives, or NULL if no message available.
405 // node will be set to the node from which the message was sent.
406 // tag will be set to the 'user tag' for that message.
407 // etag is the 'encoding' tag, and must be one of the COMM_ tags.
408 // Only message sent via the underlying mechanism with the
409 // given etag are checked. When one is found, the user tag and sending
410 // node are extracted from the sent data.
411 // If node = COMM_ANY_NODE, checks for messages from any node.
412 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
413 Message *CommMPI::myreceive(int& node, int& tag, int etag)
414 {
415 
416  int bufid, size, checknode, checktag, flag = false;
417  Message *newmsg = 0;
418  MPI_Status stat;
419 
420  checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
421  checktag = etag;
422 
423  // get info about message
424  bufid = MPI_Iprobe(checknode, checktag, communicator, &flag, &stat);
425  if (bufid < 0)
426  {
427  // an error has occurred
428  ERRORMSG("CommMPI: cannot receive msg from node " << checknode);
429  ERRORMSG(", tag " << checktag << endl);
430 
431  if (MAX_MPI_ERRS > 0 && ++numErrors > MAX_MPI_ERRS)
432  {
433  ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
434  ERRORMSG(") exceeded. MPI is hosed!!" << endl);
435  Ippl::abort();
436  }
437  }
438 
439  // if the message is actually available, see if we can get it now
440  if (flag == true)
441  {
442  MPI_Get_count(&stat,MPI_BYTE,&size);
443  if (size < 0)
444  {
445  ERRORMSG("CommMPI: received message has size " << size << endl);
446  }
447  else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0))
448  {
449  ERRORMSG("CommMPI: received message with invalid tag ");
450  ERRORMSG(stat.MPI_TAG << endl);
451  }
452  else if (stat.MPI_SOURCE < 0)
453  {
454  ERRORMSG("CommMPI: received message from invalid source ");
455  ERRORMSG(stat.MPI_SOURCE << endl);
456  }
457  else
458  {
459  checknode = stat.MPI_SOURCE;
460  checktag = stat.MPI_TAG;
461 
462  // malloc the receive buffer
463  void *outbuff = makebuffer(size);
464 
465  // blocking receive
466  // Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
467  // dbgmsg << "Receiving MPI message of size " << size << " from node ";
468  // dbgmsg << checknode << "." << endl;
469  MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
470  communicator, &stat);
471 
472  newmsg = unpack_message(node, tag, outbuff);
473 
474  // if there was an error unpacking, then the message had a problem
475  // and is invalid, so throw this one away
476  if (newmsg == 0)
477  {
478  // free up the buffer
479  cleanupMessage(outbuff);
480  }
481  else
482  {
483  // tell the message to notify us when its done with the buffer
484  newmsg->useCommunicate(this, outbuff);
485  }
486 
487  // zero out the count of MPI-specific errors
488  numErrors = 0;
489  }
490 
491  }
492 
493  // return the new Message, or NULL if no message available
494  return newmsg;
495 }
496 
497 
499 // Synchronize all processors (everybody waits for everybody
500 // else to get here before returning to calling function).
501 // Uses MPI barrier for all procs
503 {
504  MPI_Barrier(communicator);
505 }
506 
507 
509 // resent a message buffer that has been previously packed and copied
510 // into the provided buffer. Return success.
511 bool CommMPI::resend(void *buf, int buffsize, int node, int etag)
512 {
513 
514  //Inform dbgmsg("CommMPI::resend", INFORM_ALL_NODES);
515  //dbgmsg << "About to resend buffer of size " << buffsize << " to node ";
516  //dbgmsg << node << " with etag = " << etag << endl;
517 
518  // this will only work if we're sending to another node
519  PInsist(node != myNode(), "Can only retransmit to other nodes");
520 
522 
523  // send the buffer out
524  MPI_Request request;
525  int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
526  communicator, &request);
527 
528 
529  int flag = false;
530  MPI_Status status;
531  while (!flag)
532  {
533  // check for completion of send
534  MPI_Test(&request, &flag, &status);
535  }
536 
537  // return the success of the operation
538  return (errstat == 0);
539 }
540 
541 
543 // clean up after a Message has been used (called by Message).
545 {
546  // need to free the allocated storage
547  freebuffer(d);
548 }
549 
550 
551 bool CommMPI::raw_send(void *data, int size, int node, int tag)
552 {
554 
555  return MPI_Send(data, size, MPI_BYTE, node, tag, communicator)
556  == MPI_SUCCESS;
557 }
558 
559 MPI_Request CommMPI::raw_isend(void *data, int size, int node, int tag)
560 {
561  MPI_Request request;
562 
564 
565  MPI_Isend(data, size, MPI_BYTE, node, tag, communicator, &request);
566  return request;
567 }
568 
569 int CommMPI::raw_receive(char *data, int size, int &node, int &tag)
570 {
571  if (node == COMM_ANY_NODE)
572  node = MPI_ANY_SOURCE;
573  if (tag == COMM_ANY_TAG)
574  tag = MPI_ANY_TAG;
575 
576  MPI_Status stat;
577  MPI_Recv(data, size, MPI_BYTE, node, tag, communicator, &stat);
578 
579  node = stat.MPI_SOURCE;
580  tag = stat.MPI_TAG;
581  int count;
582  MPI_Get_count(&stat, MPI_BYTE, &count);
583  return count;
584 }
585 
586 MPI_Request CommMPI::raw_ireceive(char *buf, int size, int node, int tag)
587 {
588  if (node == COMM_ANY_NODE)
589  node = MPI_ANY_SOURCE;
590  if (tag == COMM_ANY_TAG)
591  tag = MPI_ANY_TAG;
592 
593  MPI_Request request;
594  MPI_Irecv(buf, size, MPI_BYTE, node, tag, communicator, &request);
595 
596  return request;
597 }
598 
599 int CommMPI::raw_probe_receive(char *&data, int &node, int &tag)
600 {
601  if (node == COMM_ANY_NODE)
602  node = MPI_ANY_SOURCE;
603  if (tag == COMM_ANY_TAG)
604  tag = MPI_ANY_TAG;
605  MPI_Status stat;
606 
607  MPI_Probe(node, tag, communicator, &stat);
608  int count;
609  MPI_Get_count(&stat, MPI_BYTE, &count);
610  if(count>0)
611  data = new char[count];
612  else
613  data = 0;
614  node = stat.MPI_SOURCE;
615  tag = stat.MPI_TAG;
616 
617  MPI_Recv(data, count, MPI_BYTE, node, tag, communicator, &stat);
618 
619  return count;
620 }
#define MAX_MPI_ERRS
Definition: CommMPI.cpp:44
#define PACKSIZE
Definition: CommMPI.cpp:53
#define PSIZE
Definition: CommMPI.cpp:52
#define IPPL_EXIT_TAG
Definition: Tags.h:26
#define IPPL_ABORT_TAG
Definition: Tags.h:25
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
Inform & level5(Inform &inf)
Definition: Inform.cpp:49
#define ERRORMSG(msg)
Definition: IpplInfo.h:350
#define INFOMSG(msg)
Definition: IpplInfo.h:348
#define WARNMSG(msg)
Definition: IpplInfo.h:349
#define PInsist(c, m)
Definition: PAssert.h:120
CommMPI(int &argc, char **&argv, int procs=(-1), bool mpiinit=true, MPI_Comm mpicomm=MPI_COMM_WORLD)
Definition: CommMPI.cpp:67
virtual bool raw_send(void *, int size, int node, int tag)
Definition: CommMPI.cpp:551
bool weInitialized
Definition: CommMPI.h:99
virtual bool resend(void *buf, int size, int node, int etag)
Definition: CommMPI.cpp:511
MPI_Comm communicator
Definition: CommMPI.h:96
virtual MPI_Request raw_isend(void *, int size, int node, int tag)
Definition: CommMPI.cpp:559
virtual Message * myreceive(int &node, int &tag, int etag)
Definition: CommMPI.cpp:413
virtual void mybarrier(void)
Definition: CommMPI.cpp:502
virtual MPI_Request raw_ireceive(char *buf, int size, int node, int tag)
Definition: CommMPI.cpp:586
virtual ~CommMPI(void)
Definition: CommMPI.cpp:215
virtual int raw_probe_receive(char *&, int &node, int &tag)
Definition: CommMPI.cpp:599
virtual bool mysend(Message *, int node, int utag, int etag)
Definition: CommMPI.cpp:311
void * pack_message(Message *msg, int tag, int &buffsize, int node)
Definition: CommMPI.cpp:289
virtual void cleanupMessage(void *)
Definition: CommMPI.cpp:544
virtual int raw_receive(char *, int size, int &node, int &tag)
Definition: CommMPI.cpp:569
static void * makebuffer(int size)
Definition: Communicate.h:339
std::vector< int > Contexts
Definition: Communicate.h:264
static void freebuffer(void *buf)
Definition: Communicate.h:343
int find_msg_length(Message &)
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:97
Message * unpack_message(int &node, int &tag, void *pos)
int myNode() const
Definition: Communicate.h:155
void fill_msg_buffer(void *, Message &, int, int, int)
void useCommunicate(Communicate *c, void *d)
Definition: Message.h:358
static void abort(const char *=0)
Definition: IpplInfo.cpp:616
static bool retransmit()
Definition: IpplInfo.h:195
static IpplMessageCounter & getInstance()