OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
CommMPI.cpp
Go to the documentation of this file.
1//
2// CommMPI - MPI-specific communications object for use with the
3// Ippl framework.
4// Allows user to establish id's for available nodes, establish connections,
5// and send/receive data.
6//
7// Copyright (c) 2008 - 2020, Paul Scherrer Institut, Villigen PSI, Switzerland
8//
9// All rights reserved
10//
11// This file is part of OPAL.
12//
13// OPAL is free software: you can redistribute it and/or modify
14// it under the terms of the GNU General Public License as published by
15// the Free Software Foundation, either version 3 of the License, or
16// (at your option) any later version.
17//
18// You should have received a copy of the GNU General Public License
19// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
20//
21#include "Message/CommMPI.h"
22#include "Message/Message.h"
23#include "Utility/IpplInfo.h"
24#include "Utility/PAssert.h"
25
26
28
29// include mpi header file
30#include <mpi.h>
31
32#ifdef _OPENMP
33 #include <omp.h>
34#endif
35
36#include <cstring>
37#include <cstdlib>
38#include <unistd.h>
39#include <vector>
40
41// if an error occurs during myreceive more times than this, CommMPI
42// will just exit. Make it negative to totally disable checking for a
43// maximum number of errors
44#define MAX_MPI_ERRS 500
45
46
47// static data to keep track of errors
48static int numErrors = 0;
49static int size_of_MPI_INT; /* needed for tracing */
50
51// temporary buffer used for speed
52#define PSIZE 1024*16
53#define PACKSIZE ((PSIZE)*sizeof(long))
54static long mpipackbuf[PSIZE];
55
56
57
59// constructor. arguments: command-line args, and number of processes
60// to start (if < 0, start the 'default' number, i.e. the number of
61// hosts in a MPI virtual machine, the number of nodes in an O2K, etc)
62// Note: The base-class constructor does not need the argument info or
63// the number of nodes, it just by default sets the number of nodes=1
64// The final argument indicates whether to run MPI_Init or not; IPPL
65// may be run as another user of MPI, in a context where MPI_Init has
66// already been called, in which case it can skip that step.
67CommMPI::CommMPI(int& argc , char**& argv, int procs, bool mpiinit, MPI_Comm mpicomm)
68 : Communicate(argc, argv, procs), weInitialized(mpiinit)
69{
70
71 int i, reported, rep_host, ierror, result_len;
72 MPI_Status stat;
73 char *currtok, *nexttok, *execname;
74
75 // a little "string magic" to strip the absolute pathname off the executable
76 currtok = strstr(argv[0],"/");
77 if (!currtok)
78 {
79 execname = strdup(argv[0]);
80 }
81 else
82 {
83 currtok++;
84 nexttok = strstr(currtok,"/");
85 while (nexttok)
86 {
87 currtok = nexttok+1;
88 nexttok = strstr(currtok,"/");
89 }
90 execname = strdup(currtok);
91 }
92
93 // initialize mpi
94 if (weInitialized) {
95#ifdef _OPENMP
96 int provided = 0;
97 MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
98 INFOMSG("Ippl will be initialized with " <<
99 omp_get_max_threads() << " OMP threads\n");
100
101 if ( provided != MPI_THREAD_FUNNELED )
102 ERRORMSG("CommMPI: Didn't get requested MPI-OpenMP setting.\n");
103#else
104 MPI_Init(&argc, &argv);
105#endif
106 }
107 //else
108 // INFOMSG("NOT initializing MPI = " << endl);
109
110 // restore original executable name without absolute path
111 strcpy(argv[0],execname);
112
113 free(execname);
114
115 // duplicate the MPI_COMM_WORLD communicator, so that we can use
116 // a communicator that will not conflict with other users of MPI_COMM_WORLD
117 MPI_Comm_dup(mpicomm, &communicator);
118
119 // determine the number of nodes running and my node number
120 MPI_Comm_size(communicator,&TotalNodes);
121 MPI_Comm_rank(communicator,&myHost);
122
123 // make sure we do not have too many processes running
124 if (procs > 0 && procs < TotalNodes)
125 {
126 // if this is a process that is beyond what we had requested, just exit
127 if (myHost >= procs)
128 Ippl::abort();
129 TotalNodes = procs;
130 }
131
132 MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
133 if (myHost == 0) // this code is run by the master process
134 {
135 // send a messages to each child node
136 for (i = 1; i < TotalNodes; i++)
137 {
138 MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, communicator);
139 }
140
141 // wait for the spawned processes to report back that they're ready
142 //~ int *child_ready = new int[TotalNodes];
143 std::vector<int> child_ready(TotalNodes);
144 for (i = 0; i < TotalNodes; child_ready[i++] = 0)
145 ;
146 INFOMSG(level5 << "CommMPI: Parent process waiting for children ..." << endl);
147 reported = 1; // since the parent is already ready
148 while (reported < TotalNodes)
149 {
150 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
152
153 if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
154 {
155 child_ready[rep_host] = 1;
156 reported++;
157 INFOMSG(level5 << "CommMPI: Child " << rep_host << " ready." << endl);
158 }
159 else
160 {
161 ERRORMSG("CommMPI: Error with child reporting to parent. ");
162 ERRORMSG("rep_host = " << rep_host);
163 ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
164 }
165 }
166
167 //~ delete [] child_ready;
168 INFOMSG(level5 << "CommMPI: Initialization complete." << endl);
169
170 }
171 else // this is a child process; get data from pops
172 {
173 char host_name[MPI_MAX_PROCESSOR_NAME];
174 ierror = MPI_Get_processor_name(host_name, &result_len);
175 if (ierror >= 0)
176 {
177 INFOMSG(level5 << "CommMPI: Started job " << myHost << " on host `");
178 INFOMSG(host_name << "'." << endl);
179 }
180 else
181 {
182 ERRORMSG("CommMPI: failed" << endl);
183 }
184
185 // receive message from the master node
186 int checknode;
187 MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator,
188 &stat);
189
190 if (checknode != 0)
191 WARNMSG("CommMPI: Child received bad message during startup." << endl);
192
193 // send back an acknowledgement
194 MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator);
195
196 }
197
198 // set up the contexts and processes arrays properly
199 if (TotalNodes > 1)
200 {
201 std::vector<int> proccount;
202 proccount.push_back(1);
203 for (i = 1; i < TotalNodes; i++)
204 {
205 Contexts.push_back(1);
206 Processes.push_back(proccount);
207 }
208 }
209
210}
211
212
214// class destructor
216{
217
218 int i, dieCode = 0;
219 MPI_Status stat;
220
221 // on all nodes, when running in parallel, get any extra messages not
222 // yet received
223 if (TotalNodes > 1)
224 {
225 int trial, node, tag;
226 Message *msg;
227 for (trial = 0; trial < 50000; ++trial)
228 {
229 do
230 {
231 node = COMM_ANY_NODE;
232 tag = COMM_ANY_TAG;
233 msg = myreceive(node, tag, COMM_SEND_TAG);
234 if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG)
235 {
236 WARNMSG("CommMPI: Found extra message from node " << node);
237 WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
238 }
239 }
240 while (msg != 0);
241 }
242 }
243
244 // broadcast a message to all other nodes to tell them to quit
245 if (myNode() == 0)
246 {
247 // on master node, send out messages
248 for (i = 1; i < TotalNodes; i++)
249 {
250 MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, communicator);
251
252 }
253 }
254 else
255 {
256 // on client nodes, receive message
257 MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, communicator, &stat);
258
259 }
260
261 MPI_Barrier(communicator);
262 // delete the communicator we used
263 MPI_Comm_free(&communicator);
264
265 // if we did our own initialization, also do finalize operation. But
266 // if we did not initialize, skip the finalize as well
267
268 /*
269 ada: it make no sense to call finalize from the application,
270 because the IPPL destructor is called AFTER that, which causes:
271 "0032-151 MPI is already finalized in string, task number"
272 */
273 if (weInitialized)
274 MPI_Finalize();
275}
276
277
279// take the data from a Message object and pack it into the current send buf.
280// each message is packed in this order:
281// tag, sending node, number of items (3-int array)
282// type of item 1 (short)
283// size of item 1, in number of elements (int)
284// item 1 data (various)
285// ...
286// type of item N (short)
287// size of item N, in number of elements (int)
288// item N data (various)
289void *CommMPI::pack_message(Message *msg, int tag, int &buffsize, int node)
290{
291
292 // calculate size of buffer
293 buffsize = find_msg_length(*msg);
294
295 // allocate storage for buffer
296 void *pos = ((unsigned int) buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
297
298 // pack message data and return the necessary pointer
299 fill_msg_buffer(pos, *msg, tag, buffsize, node);
300 return pos;
301}
302
303
305// send a message ... arguments are the Message itself, the
306// destination node, the 'user' tag, and the 'encoding' tag.
307// Messages should be sent via the underlying mechanism by using the
308// encoding tag (one of the COMM_ tags),
309// and should embed the information about what the user
310// tag is in the data sent between nodes. Return success.
311bool CommMPI::mysend(Message *msg, int node, int tag, int etag)
312{
313
314 int errstat = (-1);
315 int flag = false;
316 MPI_Request request;
317 MPI_Status status;
318
319 MPI_Status rec_status;
320 int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
321 Message* newmsg = NULL;
322
323 // pack the message data into the buffer
324 int size;
325 void *outbuffer = pack_message(msg, tag, size, node);
326
327 // send the message (non-blocking)
328 // Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
329 // dbgmsg << "Sending MPI message of size " << size << " to node " << node;
330 // dbgmsg << " with tag " << tag << "." << endl;
331
332 //messaging "profiler"
334
335 errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
336 communicator, &request);
337
338
339 while (!flag)
340 {
341 if (!Ippl::retransmit())
342 {
343 // get info about messages to be received
344 bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, communicator,
345 &rec_flag, &rec_status);
346 if ((bufid >= 0) && (rec_flag != 0) )
347 {
348 // a message is available to be received
349 src_node = rec_status.MPI_SOURCE;
350 rec_tag = rec_status.MPI_TAG;
351 MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
352 // dbgmsg<<"Receiving MPI message of size " << rec_size << " from node ";
353 // dbgmsg << src_node << "." << endl;
354 if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) )
355 {
356 // message is a valid one, so malloc the output buffer
357 void *rec_buff = makebuffer(rec_size);
358
359 // blocking receive, unpack message
360 MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
361 communicator, &rec_status);
362
363 newmsg = unpack_message(rec_node, rec_utag, rec_buff);
364
365 // if there was an error unpacking, then the message had a problem
366 // and is invalid, so throw this one away
367 if (newmsg == 0)
368 {
369 // free up the buffer
370 cleanupMessage(rec_buff);
371
372 }
373 else
374 {
375 // tell the message to inform us when the buffer is finished
376 newmsg->useCommunicate(this, rec_buff);
377
378 // put message in my message queue
379 add_msg(newmsg, rec_node, rec_utag);
380 }
381
382 // reset other receive information
383 newmsg = NULL; // reset message pointer
384 rec_flag = 0; // reset receive flag
385 }
386 }
387 }
388
389 // check for completion of send
390 MPI_Test(&request, &flag, &status);
391 }
392
393 // free up the send buffer
394 if ((unsigned int) size > PACKSIZE)
395 freebuffer(outbuffer);
396
397 // return the success of the operation
398 return (errstat == 0);
399}
400
401
403// receive a message from the given node and user tag. Return a NEW
404// Message object if a message arrives, or NULL if no message available.
405// node will be set to the node from which the message was sent.
406// tag will be set to the 'user tag' for that message.
407// etag is the 'encoding' tag, and must be one of the COMM_ tags.
408// Only message sent via the underlying mechanism with the
409// given etag are checked. When one is found, the user tag and sending
410// node are extracted from the sent data.
411// If node = COMM_ANY_NODE, checks for messages from any node.
412// If tag = COMM_ANY_TAG, checks for messages with any user tag.
413Message *CommMPI::myreceive(int& node, int& tag, int etag)
414{
415
416 int bufid, size, checknode, checktag, flag = false;
417 Message *newmsg = 0;
418 MPI_Status stat;
419
420 checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
421 checktag = etag;
422
423 // get info about message
424 bufid = MPI_Iprobe(checknode, checktag, communicator, &flag, &stat);
425 if (bufid < 0)
426 {
427 // an error has occurred
428 ERRORMSG("CommMPI: cannot receive msg from node " << checknode);
429 ERRORMSG(", tag " << checktag << endl);
430
431 if (MAX_MPI_ERRS > 0 && ++numErrors > MAX_MPI_ERRS)
432 {
433 ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
434 ERRORMSG(") exceeded. MPI is hosed!!" << endl);
435 Ippl::abort();
436 }
437 }
438
439 // if the message is actually available, see if we can get it now
440 if (flag == true)
441 {
442 MPI_Get_count(&stat,MPI_BYTE,&size);
443 if (size < 0)
444 {
445 ERRORMSG("CommMPI: received message has size " << size << endl);
446 }
447 else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0))
448 {
449 ERRORMSG("CommMPI: received message with invalid tag ");
450 ERRORMSG(stat.MPI_TAG << endl);
451 }
452 else if (stat.MPI_SOURCE < 0)
453 {
454 ERRORMSG("CommMPI: received message from invalid source ");
455 ERRORMSG(stat.MPI_SOURCE << endl);
456 }
457 else
458 {
459 checknode = stat.MPI_SOURCE;
460 checktag = stat.MPI_TAG;
461
462 // malloc the receive buffer
463 void *outbuff = makebuffer(size);
464
465 // blocking receive
466 // Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
467 // dbgmsg << "Receiving MPI message of size " << size << " from node ";
468 // dbgmsg << checknode << "." << endl;
469 MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
470 communicator, &stat);
471
472 newmsg = unpack_message(node, tag, outbuff);
473
474 // if there was an error unpacking, then the message had a problem
475 // and is invalid, so throw this one away
476 if (newmsg == 0)
477 {
478 // free up the buffer
479 cleanupMessage(outbuff);
480 }
481 else
482 {
483 // tell the message to notify us when its done with the buffer
484 newmsg->useCommunicate(this, outbuff);
485 }
486
487 // zero out the count of MPI-specific errors
488 numErrors = 0;
489 }
490
491 }
492
493 // return the new Message, or NULL if no message available
494 return newmsg;
495}
496
497
499// Synchronize all processors (everybody waits for everybody
500// else to get here before returning to calling function).
501// Uses MPI barrier for all procs
503{
504 MPI_Barrier(communicator);
505}
506
507
509// resent a message buffer that has been previously packed and copied
510// into the provided buffer. Return success.
511bool CommMPI::resend(void *buf, int buffsize, int node, int etag)
512{
513
514 //Inform dbgmsg("CommMPI::resend", INFORM_ALL_NODES);
515 //dbgmsg << "About to resend buffer of size " << buffsize << " to node ";
516 //dbgmsg << node << " with etag = " << etag << endl;
517
518 // this will only work if we're sending to another node
519 PInsist(node != myNode(), "Can only retransmit to other nodes");
520
522
523 // send the buffer out
524 MPI_Request request;
525 int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
526 communicator, &request);
527
528
529 int flag = false;
530 MPI_Status status;
531 while (!flag)
532 {
533 // check for completion of send
534 MPI_Test(&request, &flag, &status);
535 }
536
537 // return the success of the operation
538 return (errstat == 0);
539}
540
541
543// clean up after a Message has been used (called by Message).
545{
546 // need to free the allocated storage
547 freebuffer(d);
548}
549
550
551bool CommMPI::raw_send(void *data, int size, int node, int tag)
552{
554
555 return MPI_Send(data, size, MPI_BYTE, node, tag, communicator)
556 == MPI_SUCCESS;
557}
558
559MPI_Request CommMPI::raw_isend(void *data, int size, int node, int tag)
560{
561 MPI_Request request;
562
564
565 MPI_Isend(data, size, MPI_BYTE, node, tag, communicator, &request);
566 return request;
567}
568
569int CommMPI::raw_receive(char *data, int size, int &node, int &tag)
570{
571 if (node == COMM_ANY_NODE)
572 node = MPI_ANY_SOURCE;
573 if (tag == COMM_ANY_TAG)
574 tag = MPI_ANY_TAG;
575
576 MPI_Status stat;
577 MPI_Recv(data, size, MPI_BYTE, node, tag, communicator, &stat);
578
579 node = stat.MPI_SOURCE;
580 tag = stat.MPI_TAG;
581 int count;
582 MPI_Get_count(&stat, MPI_BYTE, &count);
583 return count;
584}
585
586MPI_Request CommMPI::raw_ireceive(char *buf, int size, int node, int tag)
587{
588 if (node == COMM_ANY_NODE)
589 node = MPI_ANY_SOURCE;
590 if (tag == COMM_ANY_TAG)
591 tag = MPI_ANY_TAG;
592
593 MPI_Request request;
594 MPI_Irecv(buf, size, MPI_BYTE, node, tag, communicator, &request);
595
596 return request;
597}
598
599int CommMPI::raw_probe_receive(char *&data, int &node, int &tag)
600{
601 if (node == COMM_ANY_NODE)
602 node = MPI_ANY_SOURCE;
603 if (tag == COMM_ANY_TAG)
604 tag = MPI_ANY_TAG;
605 MPI_Status stat;
606
607 MPI_Probe(node, tag, communicator, &stat);
608 int count;
609 MPI_Get_count(&stat, MPI_BYTE, &count);
610 if(count>0)
611 data = new char[count];
612 else
613 data = 0;
614 node = stat.MPI_SOURCE;
615 tag = stat.MPI_TAG;
616
617 MPI_Recv(data, count, MPI_BYTE, node, tag, communicator, &stat);
618
619 return count;
620}
#define MAX_MPI_ERRS
Definition: CommMPI.cpp:44
#define PACKSIZE
Definition: CommMPI.cpp:53
#define PSIZE
Definition: CommMPI.cpp:52
#define IPPL_EXIT_TAG
Definition: Tags.h:26
#define IPPL_ABORT_TAG
Definition: Tags.h:25
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
Inform & level5(Inform &inf)
Definition: Inform.cpp:49
#define PInsist(c, m)
Definition: PAssert.h:120
#define ERRORMSG(msg)
Definition: IpplInfo.h:350
#define INFOMSG(msg)
Definition: IpplInfo.h:348
#define WARNMSG(msg)
Definition: IpplInfo.h:349
CommMPI(int &argc, char **&argv, int procs=(-1), bool mpiinit=true, MPI_Comm mpicomm=MPI_COMM_WORLD)
Definition: CommMPI.cpp:67
virtual bool raw_send(void *, int size, int node, int tag)
Definition: CommMPI.cpp:551
bool weInitialized
Definition: CommMPI.h:99
virtual bool resend(void *buf, int size, int node, int etag)
Definition: CommMPI.cpp:511
MPI_Comm communicator
Definition: CommMPI.h:96
virtual MPI_Request raw_isend(void *, int size, int node, int tag)
Definition: CommMPI.cpp:559
virtual Message * myreceive(int &node, int &tag, int etag)
Definition: CommMPI.cpp:413
virtual void mybarrier(void)
Definition: CommMPI.cpp:502
virtual MPI_Request raw_ireceive(char *buf, int size, int node, int tag)
Definition: CommMPI.cpp:586
virtual ~CommMPI(void)
Definition: CommMPI.cpp:215
virtual int raw_probe_receive(char *&, int &node, int &tag)
Definition: CommMPI.cpp:599
virtual bool mysend(Message *, int node, int utag, int etag)
Definition: CommMPI.cpp:311
void * pack_message(Message *msg, int tag, int &buffsize, int node)
Definition: CommMPI.cpp:289
virtual void cleanupMessage(void *)
Definition: CommMPI.cpp:544
virtual int raw_receive(char *, int size, int &node, int &tag)
Definition: CommMPI.cpp:569
std::vector< int > Contexts
Definition: Communicate.h:264
static void * makebuffer(int size)
Definition: Communicate.h:339
static void freebuffer(void *buf)
Definition: Communicate.h:343
int find_msg_length(Message &)
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:97
Message * unpack_message(int &node, int &tag, void *pos)
int myNode() const
Definition: Communicate.h:155
void fill_msg_buffer(void *, Message &, int, int, int)
void useCommunicate(Communicate *c, void *d)
Definition: Message.h:358
static void abort(const char *=0)
Definition: IpplInfo.cpp:616
static bool retransmit()
Definition: IpplInfo.h:195
static IpplMessageCounter & getInstance()