OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
CommPM.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
26 // include files
27 #include "Message/CommPM.h"
28 #include "Message/Message.h"
29 #include "Utility/IpplInfo.h"
30 
31 
32 // include mpi header file
33 #include <mpi.h>
34 
35 // include PM header file. need to invoke PM library.
36 // #include <Pm.h>
37 // define interface directly, instead of using Pm.h.
38 // (because KCC couldn't parse Pm.h)
39 // The following definitions are written in Pm.h
40 typedef void* pmCtx;
41 extern "C"
42 {
43  int _pmGetSendBuf(pmCtx pmc, caddr_t *bufp, size_t length);
44  int _pmSend(pmCtx pmc, int dst_node);
45  int _pmSendDone(pmCtx pmc);
46  int _pmReceive(pmCtx pmc, caddr_t *bufp, size_t *length);
47  int _pmPutReceiveBuf(pmCtx pmc);
48  extern pmCtx _pm_subnet[];
49  extern int _pm_subnet_count;
50 };
51 #define PM_MTU (8192 + 32)
52 
53 // include score header file. need to invoke sub-network functions.
54 #include <score.h>
55 
56 #include <cstring>
57 #include <cstdlib>
58 #include <unistd.h>
59 
60 // if an error occurs during myreceive more times than this, CommPM
61 // will just exit. Make it negative to totally disable checking for a
62 // maximum number of errors
63 #define MAX_MPI_ERRS 500
64 
65 
66 // static data to keep track of errors
67 static int size_of_MPI_INT; /* needed for tracing */
68 
69 // sub-network for message passing using PM directly.
70 static pmCtx pm_network;
71 
73 // constructor. arguments: command-line args, and number of processes
74 // to start (if < 0, start the 'default' number, i.e. the number of
75 // hosts in a MPI virtual machine, the number of nodes in an O2K, etc)
76 // Note: The base-class constructor does not need the argument info or
77 // the number of nodes, it just by default sets the number of nodes=1
78 CommPM::CommPM(int& argc , char**& argv, int procs)
79  : Communicate(argc, argv, procs)
80 {
81 
82  int i, reported, rep_host, ierror, result_len;
83  MPI_Status stat;
84  char *currtok, *nexttok, *execname;
85 
86  // a little "string magic" to strip the absolute pathname off the executable
87  currtok = strstr(argv[0],"/");
88  if (!currtok)
89  {
90  execname = strdup(argv[0]);
91  }
92  else
93  {
94  currtok++;
95  nexttok = strstr(currtok,"/");
96  while (nexttok)
97  {
98  currtok = nexttok+1;
99  nexttok = strstr(currtok,"/");
100  }
101  execname = strdup(currtok);
102  }
103 
104  // allocate a sub-network for message passing using PM directly.
105  if (_score_alloc_subnet())
106  {
107  ERRORMSG("CommPM: Error with allocating sub-network.");
108  }
109  pm_network = _pm_subnet[_pm_subnet_count - 1];
110 
111  // initialize mpi
112  MPI_Init(&argc, &argv);
113 
114  // restore original executable name without absolute path
115  strcpy(argv[0],execname);
116 
117  // determine the number of nodes running and my node number
118  MPI_Comm_size(MPI_COMM_WORLD,&TotalNodes);
119  MPI_Comm_rank(MPI_COMM_WORLD,&myHost);
120 
121  // make sure we do not have too many processes running
122  if (procs > 0 && procs < TotalNodes)
123  {
124  // if this is a process that is beyond what we had requested, just exit
125  if (myHost >= procs)
126  Ippl::abort();
127  TotalNodes = procs;
128  }
129 
130  MPI_Type_size ( MPI_INT, (MPI_Aint*)&size_of_MPI_INT );
131  if (myHost == 0) // this code is run by the master process
132  {
133  // send a messages to each child node
134  for (i = 1; i < TotalNodes; i++)
135  {
136  MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, MPI_COMM_WORLD);
137 
138  }
139 
140  // wait for the spawned processes to report back that they're ready
141  //~ int *child_ready = new int[TotalNodes];
142  std::vector<int> child_ready(TotalNodes);
143  for (i = 0; i < TotalNodes; child_ready[i++] = 0);
144  INFOMSG("CommPM: Parent process waiting for children ..." << endl);
145  reported = 1; // since the parent is already ready
146  while (reported < TotalNodes)
147  {
148  ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
149  COMM_HOSTS_TAG, MPI_COMM_WORLD, &stat);
150 
151  if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
152  {
153  child_ready[rep_host] = 1;
154  reported++;
155  INFOMSG("CommPM: Child " << rep_host << " ready." << endl);
156  }
157  else
158  {
159  ERRORMSG("CommPM: Error with child reporting to parent. ");
160  ERRORMSG("rep_host = " << rep_host);
161  ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
162  }
163  }
164 
165  //~ delete [] child_ready;
166  INFOMSG("CommPM: Initialization complete." << endl);
167 
168  }
169  else // this is a child process; get data from pops
170  {
171  char host_name[MPI_MAX_PROCESSOR_NAME];
172  ierror = MPI_Get_processor_name(host_name, &result_len);
173  if (ierror >= 0)
174  {
175  INFOMSG("CommPM: Started job " << myHost << " on host `");
176  INFOMSG(host_name << "'." << endl);
177  }
178  else
179  {
180  ERRORMSG("CommPM: failed" << endl);
181  }
182 
183  // receive message from the master node
184  int checknode;
185  MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD,
186  &stat);
187 
188  if (checknode != 0)
189  WARNMSG("CommPM: Child received bad message during startup." << endl);
190 
191  // send back an acknowledgement
192  MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD);
193 
194  }
195 
196  // set up the contexts and processes arrays properly
197  if (TotalNodes > 1)
198  {
199  vector<int> proccount;
200  proccount.push_back(1);
201  for (i = 1; i < TotalNodes; i++)
202  {
203  Contexts.push_back(1);
204  Processes.push_back(proccount);
205  }
206  }
207 
208 }
209 
210 
212 // class destructor
214 {
215 
216  int i, dieCode = 0;
217  MPI_Status stat;
218 
219  // on all nodes, when running in parallel, get any extra messages not
220  // yet received
221  if (TotalNodes > 1)
222  {
223  int trial, node, tag;
224  Message *msg;
225  for (trial = 0; trial < 50000; ++trial)
226  {
227  do
228  {
229  node = COMM_ANY_NODE;
230  tag = COMM_ANY_TAG;
231  msg = myreceive(node, tag, COMM_SEND_TAG);
232  if (msg != 0)
233  {
234  WARNMSG("CommPM: Found extra message from node " << node);
235  WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
236  }
237  }
238  while (msg != 0);
239  }
240  }
241 
242  // broadcast a message to all other nodes to tell them to quit
243  if (myNode() == 0)
244  {
245  // on master node, send out messages
246  for (i = 1; i < TotalNodes; i++)
247  {
248  MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, MPI_COMM_WORLD);
249 
250  }
251  }
252  else
253  {
254  // on client nodes, receive message
255  MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
256 
257  }
258 
259  // release sub-network
260  _score_free_subnet();
261 
262  MPI_Finalize();
263 }
264 
265 
267 {
268  int node;
269  int tag;
270 };
271 
272 #if 0
273 // take the data from a Message object and pack it into the current send buf.
275 // each message is packed in this order:
276 // tag, sending node, number of items (3-int array)
277 // type of item 1 (short)
278 // size of item 1, in number of elements (int)
279 // item 1 data (various)
280 // ...
281 // type of item N (short)
282 // size of item N, in number of elements (int)
283 // item N data (various)
284 void *CommPM::pack_message(Message *msg, int tag, int &buffsize)
285 {
286  // calculate size of buffer
287  buffsize = find_msg_length(*msg);
288 
289  // allocate storage for buffer
290  void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
291 
292  // pack message data and return the necessary pointer
293  fill_msg_buffer(pos, *msg, tag);
294  return pos;
295 }
296 #endif
297 
299 // send a message ... arguments are the Message itself, the
300 // destination node, the 'user' tag, and the 'encoding' tag.
301 // Messages should be sent via the underlying mechanism by using the
302 // encoding tag (one of the COMM_ tags),
303 // and should embed the information about what the user
304 // tag is in the data sent between nodes. Return success.
305 bool CommPM::mysend(Message *msg, int node, int tag, int etag)
306 {
307 
308  int nerr = 0;
309 
310  // printf("mysend\n");
311 
312  // calculate size of buffer
313  int length;
314  length = find_msg_length(*msg) + sizeof(PM_Message);
315 
316  // printf("length = %d\n", length);
317 
318  // allocate storage for buffer
319  PM_Message* msgbuf;
320  int timeout_counter = 0;
321  while (_pmGetSendBuf(pm_network, (caddr_t*)&msgbuf, length) == ENOBUFS)
322  {
323  timeout_counter++;
324  if (timeout_counter > 10000000)
325  {
326  ERRORMSG("CommPM: _pmGetSendBuf TIMEOUT");
327  timeout_counter = 0;
328  nerr++;
329  }
330  // receive a message when the buffer couldn't be allocated.
331  nerr += pickup_message();
332  }
333 
334  // printf("buff %x is allocated\n", msgbuf);
335  // pack message
336  msgbuf->node = myHost;
337  msgbuf->tag = tag;
338  // printf("invoke fill_msg_buffer\n");
339  fill_msg_buffer((void*) (&(msgbuf->tag) + 1), *msg, tag);
340  // printf("fill_msg_buffer done\n");
341 
342  // send message
343  if (_pmSend(pm_network, node))
344  {
345  ERRORMSG("CommPM: _pmSend Error");
346  nerr++;
347  }
348  // receive message waiting the sending done.
349  while (_pmSendDone(pm_network) == EBUSY)
350  {
351  nerr += pickup_message();
352  }
353 
354  // printf("mysend done\n");
355  // return the succsess of the operation
356  return (nerr == 0);
357 }
358 
360 {
361  int nerr = 0;
362  int length;
363  PM_Message* msgbuf;
364  int error;
365  Message* newmsg = 0;
366  // int rec_tag;
367  int src_node, rec_size, rec_utag;
368  void* rec_buff;
369 
370  // pickup message
371  if (error = _pmReceive(pm_network, (caddr_t*)&msgbuf, (size_t*)&length))
372  {
373  // no message is received
374  if (error != ENOBUFS)
375  {
376  ERRORMSG("CommPM: _pmReceive Error (in pickup_message)");
377  nerr++;
378  }
379  }
380  else
381  {
382  // a message is received, unpack it
383 
384  src_node = msgbuf->node;
385  // rec_tag = msgbuf->tag;
386  rec_size = length - sizeof(PM_Message);
387  rec_buff = makebuffer(rec_size);
388  // bcopy((void*) (&(msgbuf->tag) + 1), rec_buff, rec_size);
389  memcpy(rec_buff, (void*) (&(msgbuf->tag) + 1), rec_size);
390  _pmPutReceiveBuf(pm_network);
391  newmsg = unpack_message(src_node, rec_utag, rec_buff);
392  newmsg->useCommunicate(this, rec_buff);
393 
394  // put message in my message queue
395  if (add_msg(newmsg, src_node, rec_utag))
396  {
397  newmsg = NULL; // reset message pointer
398  }
399  }
400 
401  // return the number of errors;
402  return nerr;
403 }
404 
405 
407 // receive a message from the given node and user tag. Return a NEW
408 // Message object if a message arrives, or NULL if no message available.
409 // node will be set to the node from which the message was sent.
410 // tag will be set to the 'user tag' for that message.
411 // etag is the 'encoding' tag, and must be one of the COMM_ tags.
412 // Only message sent via the underlying mechanism with the
413 // given etag are checked. When one is found, the user tag and sending
414 // node are extracted from the sent data.
415 // If node = COMM_ANY_NODE, checks for messages from any node.
416 // If tag = COMM_ANY_TAG, checks for messages with any user tag.
417 Message *CommPM::myreceive(int& node, int& tag, int etag)
418 {
419 
420  int error;
421  PM_Message* msgbuf;
422  int length;
423  int rec_tag, src_node, rec_size, rec_utag;
424  void* rec_buff;
425  Message* newmsg = 0;
426 
427  // pickup message
428  if (error = _pmReceive(pm_network, (caddr_t*)&msgbuf, (size_t*)&length))
429  {
430  // no message is received
431  if (error != ENOBUFS)
432  {
433  ERRORMSG("CommPM: _pmReceive Error (in myreceive)");
434  }
435  else
436  {
437  // no message is available
438  DEBUGMSG(level2<<"CommPM: No Message Received to Match Request"<<endl);
439  }
440  }
441  else
442  {
443  // a message is received, unpack it
444 
445  src_node = msgbuf->node;
446  rec_tag = msgbuf->tag;
447  rec_size = length - sizeof(PM_Message);
448  rec_buff = makebuffer(rec_size);
449  // bcopy((void*) (&(msgbuf->tag) + 1), rec_buff, rec_size);
450  memcpy(rec_buff, (void*) (&(msgbuf->tag) + 1), rec_size);
451  _pmPutReceiveBuf(pm_network);
452  newmsg = unpack_message(src_node, rec_utag, rec_buff);
453  newmsg->useCommunicate(this, rec_buff);
454  }
455 
456  // Retrun the new Message, or NULL if no message available
457  node = src_node;
458  tag = rec_tag;
459  return newmsg;
460 }
461 
462 
464 // Synchronize all processors (everybody waits for everybody
465 // else to get here before returning to calling function).
466 // Uses MPI barrier for all procs
468 {
469 
470 
471  MPI_Barrier(MPI_COMM_WORLD);
472 }
473 
474 
476 // clean up after a Message has been used (called by Message).
478 {
479 
480 
481  // need to free the allocated storage
482  freebuffer(d);
483 }
static void abort(const char *=0, int exitcode=(-1))
Definition: IpplInfo.cpp:696
virtual bool mysend(Message *, int node, int utag, int etag)
Definition: CommPM.cpp:305
CommPM(int &argc, char **&argv, int procs=(-1))
Definition: CommPM.cpp:78
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:98
#define PACKSIZE
Definition: CommMPI.cpp:63
int myNode() const
Definition: Communicate.h:155
void * pmCtx
Definition: CommPM.cpp:40
void fill_msg_buffer(void *, Message &, int, int, int)
#define ERRORMSG(msg)
Definition: IpplInfo.h:399
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
int _pmGetSendBuf(pmCtx pmc, caddr_t *bufp, size_t length)
std::vector< int > Contexts
Definition: Communicate.h:264
int _pmSendDone(pmCtx pmc)
int find_msg_length(Message &)
int pickup_message(void)
Definition: CommPM.cpp:359
Inform & level2(Inform &inf)
Definition: Inform.cpp:46
int _pmPutReceiveBuf(pmCtx pmc)
int node
Definition: CommPM.cpp:268
int _pm_subnet_count
int _pmSend(pmCtx pmc, int dst_node)
int _pmReceive(pmCtx pmc, caddr_t *bufp, size_t *length)
#define INFOMSG(msg)
Definition: IpplInfo.h:397
static void * makebuffer(int size)
Definition: Communicate.h:339
pmCtx _pm_subnet[]
#define WARNMSG(msg)
Definition: IpplInfo.h:398
void useCommunicate(Communicate *c, void *d)
Definition: Message.h:366
static void freebuffer(void *buf)
Definition: Communicate.h:343
virtual void cleanupMessage(void *)
Definition: CommPM.cpp:477
virtual ~CommPM(void)
Definition: CommPM.cpp:213
virtual Message * myreceive(int &node, int &tag, int etag)
Definition: CommPM.cpp:417
Message * unpack_message(int &node, int &tag, void *pos)
#define DEBUGMSG(msg)
Definition: IpplInfo.h:405
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
virtual void mybarrier(void)
Definition: CommPM.cpp:467