00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "Message/CommMPI.h"
00028 #include "Message/Message.h"
00029 #include "Utility/IpplInfo.h"
00030 #include "Utility/PAssert.h"
00031 #include "Profile/Profiler.h"
00032
00033
00034 #include <mpi.h>
00035
00036 #include <string.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039
00040
00041
00042
00043
00044 #define MAX_MPI_ERRS 500
00045
00046
00047
00048 static int numErrors = 0;
00049 #if defined(IPPL_MPIXX)
00050 static MPI_Aint size_of_MPI_INT;
00051 #else
00052 static int size_of_MPI_INT;
00053 #endif
00054
00055
00056 #define PSIZE 1024*16
00057 #define PACKSIZE ((PSIZE)*sizeof(long))
00058 static long mpipackbuf[PSIZE];
00059
00060
00061
00063
00064
00065
00066
00067
00068
00069
00070
00071 CommMPI::CommMPI(int& argc , char**& argv, int procs, bool mpiinit)
00072 : Communicate(argc, argv, procs), weInitialized(mpiinit)
00073 {
00074 TAU_PROFILE("CommMPI::CommMPI()", "void (int, char **, int)", TAU_MESSAGE);
00075 int i, reported, rep_host, ierror, result_len;
00076 MPI_Status stat;
00077 char *currtok, *nexttok, *execname;
00078
00079
00080 currtok = strstr(argv[0],"/");
00081 if (!currtok) {
00082 execname = strdup(argv[0]);
00083 } else {
00084 currtok++;
00085 nexttok = strstr(currtok,"/");
00086 while (nexttok) {
00087 currtok = nexttok+1;
00088 nexttok = strstr(currtok,"/");
00089 }
00090 execname = strdup(currtok);
00091 }
00092
00093
00094 if (weInitialized)
00095 MPI_Init(&argc, &argv);
00096
00097
00098 strcpy(argv[0],execname);
00099
00100
00101
00102 MPI_Comm_dup(MPI_COMM_WORLD, &communicator);
00103
00104
00105 MPI_Comm_size(communicator,&TotalNodes);
00106 MPI_Comm_rank(communicator,&myHost);
00107
00108
00109 if (procs > 0 && procs < TotalNodes) {
00110
00111 if (myHost >= procs)
00112 Ippl::abort();
00113 TotalNodes = procs;
00114 }
00115
00116 MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
00117 if (myHost == 0) {
00118
00119 for (i = 1; i < TotalNodes; i++)
00120 {
00121 MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, communicator);
00122 TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, i, 1*size_of_MPI_INT);
00123 }
00124
00125
00126 int *child_ready = new int[TotalNodes];
00127 for (i = 0; i < TotalNodes; child_ready[i++] = 0);
00128 INFOMSG("CommMPI: Parent process waiting for children ..." << endl);
00129 reported = 1;
00130 while (reported < TotalNodes) {
00131 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
00132 COMM_HOSTS_TAG, communicator, &stat);
00133 TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, stat.MPI_SOURCE, 1*size_of_MPI_INT);
00134 if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host])) {
00135 child_ready[rep_host] = 1;
00136 reported++;
00137 INFOMSG("CommMPI: Child " << rep_host << " ready." << endl);
00138 }
00139 else {
00140 ERRORMSG("CommMPI: Error with child reporting to parent. ");
00141 ERRORMSG("rep_host = " << rep_host);
00142 ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00143 }
00144 }
00145
00146 delete [] child_ready;
00147 INFOMSG("CommMPI: Initialization complete." << endl);
00148
00149 }
00150 else {
00151 char host_name[MPI_MAX_PROCESSOR_NAME];
00152 ierror = MPI_Get_processor_name(host_name, &result_len);
00153 if (ierror >= 0) {
00154 INFOMSG("CommMPI: Started job " << myHost << " on host `");
00155 INFOMSG(host_name << "'." << endl);
00156 }
00157 else {
00158 ERRORMSG("CommMPI: failed" << endl);
00159 }
00160
00161
00162 int checknode;
00163 MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator,
00164 &stat);
00165 TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00166 if (checknode != 0)
00167 WARNMSG("CommMPI: Child received bad message during startup." << endl);
00168
00169
00170 MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, communicator);
00171 TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00172 }
00173
00174
00175 if (TotalNodes > 1) {
00176 vector<int> proccount;
00177 proccount.push_back(1);
00178 for (i = 1; i < TotalNodes; i++) {
00179 Contexts.push_back(1);
00180 Processes.push_back(proccount);
00181 }
00182 }
00183
00184 }
00185
00186
00188
00189 CommMPI::~CommMPI(void)
00190 {
00191 TAU_PROFILE("CommMPI::~CommMPI()", "void()", TAU_MESSAGE);
00192 int i, dieCode = 0;
00193 MPI_Status stat;
00194
00195
00196
00197 if (TotalNodes > 1) {
00198 int trial, node, tag;
00199 Message *msg;
00200 for (trial = 0; trial < 50000; ++trial) {
00201 do {
00202 node = COMM_ANY_NODE;
00203 tag = COMM_ANY_TAG;
00204 msg = myreceive(node, tag, COMM_SEND_TAG);
00205 if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG) {
00206 WARNMSG("CommMPI: Found extra message from node " << node);
00207 WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00208 }
00209 } while (msg != 0);
00210 }
00211 }
00212
00213
00214 if (myNode() == 0) {
00215
00216 for (i = 1; i < TotalNodes; i++)
00217 {
00218 MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, communicator);
00219 TAU_TRACE_SENDMSG(COMM_DIE_TAG, i, 1*size_of_MPI_INT);
00220 }
00221 } else {
00222
00223 MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, communicator, &stat);
00224 TAU_TRACE_RECVMSG(COMM_DIE_TAG, 0, 1*size_of_MPI_INT);
00225 }
00226
00227
00228 MPI_Comm_free(&communicator);
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238 MPI_Finalize();
00239 }
00240
00241
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253 void *CommMPI::pack_message(Message *msg, int tag, int &buffsize, int node) {
00254 TAU_PROFILE("CommMPI::pack_message()", "(Message *, int, int, int)",
00255 TAU_MESSAGE);
00256
00257 buffsize = find_msg_length(*msg);
00258
00259
00260 void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
00261
00262
00263 fill_msg_buffer(pos, *msg, tag, buffsize, node);
00264 return pos;
00265 }
00266
00267
00269
00270
00271
00272
00273
00274
00275 bool CommMPI::mysend(Message *msg, int node, int tag, int etag) {
00276 TAU_PROFILE("CommMPI::mysend()", "bool (Message *, int, int, int)",
00277 TAU_MESSAGE);
00278
00279 int nitems = msg->size();
00280 int errstat = (-1);
00281 int flag = false;
00282 MPI_Request request;
00283 MPI_Status status;
00284
00285 MPI_Status rec_status;
00286 int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
00287 Message* newmsg = NULL;
00288
00289
00290 int size;
00291 void *outbuffer = pack_message(msg, tag, size, node);
00292
00293
00294 Inform dbgmsg("CommMPI", INFORM_ALL_NODES);
00295
00296
00297 errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
00298 communicator, &request);
00299 TAU_TRACE_SENDMSG(etag, node, size);
00300
00301 while (!flag) {
00302 if (!Ippl::retransmit()) {
00303
00304 bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, communicator,
00305 &rec_flag, &rec_status);
00306 if ((bufid >= 0) && (rec_flag != 0) ) {
00307
00308 src_node = rec_status.MPI_SOURCE;
00309 rec_tag = rec_status.MPI_TAG;
00310 MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
00311
00312
00313 if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) ) {
00314
00315 void *rec_buff = makebuffer(rec_size);
00316
00317
00318 MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
00319 communicator, &rec_status);
00320 TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00321 newmsg = unpack_message(rec_node, rec_utag, rec_buff);
00322
00323
00324
00325 if (newmsg == 0) {
00326
00327 cleanupMessage(rec_buff);
00328
00329 } else {
00330
00331 newmsg->useCommunicate(this, rec_buff);
00332
00333
00334 add_msg(newmsg, rec_node, rec_utag);
00335 }
00336
00337
00338 newmsg = NULL;
00339 rec_flag = 0;
00340 }
00341 }
00342 }
00343
00344
00345 MPI_Test(&request, &flag, &status);
00346 }
00347
00348
00349 if (size > PACKSIZE)
00350 freebuffer(outbuffer);
00351
00352
00353 return (errstat == 0);
00354 }
00355
00356
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368 Message *CommMPI::myreceive(int& node, int& tag, int etag) {
00369 TAU_PROFILE("CommMPI::myreceive()", "Message *(int, int, int)", TAU_MESSAGE);
00370 int bufid, size, checknode, checktag, flag = false;
00371 Message *newmsg = 0;
00372 MPI_Status stat;
00373
00374 checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
00375 checktag = etag;
00376
00377
00378 bufid = MPI_Iprobe(checknode, checktag, communicator, &flag, &stat);
00379 if (bufid < 0) {
00380
00381 ERRORMSG("CommMPI: cannot receive msg from node " << checknode);
00382 ERRORMSG(", tag " << checktag << endl);
00383
00384 if (MAX_MPI_ERRS > 0 && ++numErrors > MAX_MPI_ERRS) {
00385 ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
00386 ERRORMSG(") exceeded. MPI is hosed!!" << endl);
00387 Ippl::abort();
00388 }
00389 }
00390
00391
00392 if (flag == true) {
00393 MPI_Get_count(&stat,MPI_BYTE,&size);
00394 if (size < 0) {
00395 ERRORMSG("CommMPI: received message has size " << size << endl);
00396 } else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0)) {
00397 ERRORMSG("CommMPI: received message with invalid tag ");
00398 ERRORMSG(stat.MPI_TAG << endl);
00399 } else if (stat.MPI_SOURCE < 0) {
00400 ERRORMSG("CommMPI: received message from invalid source ");
00401 ERRORMSG(stat.MPI_SOURCE << endl);
00402 } else {
00403 checknode = stat.MPI_SOURCE;
00404 checktag = stat.MPI_TAG;
00405
00406
00407 void *outbuff = makebuffer(size);
00408
00409
00410
00411
00412
00413 MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
00414 communicator, &stat);
00415 TAU_TRACE_RECVMSG(checktag, checknode, size);
00416 newmsg = unpack_message(node, tag, outbuff);
00417
00418
00419
00420 if (newmsg == 0) {
00421
00422 cleanupMessage(outbuff);
00423 } else {
00424
00425 newmsg->useCommunicate(this, outbuff);
00426 }
00427
00428
00429 numErrors = 0;
00430 }
00431
00432 } else {
00433
00434 DEBUGMSG(level2<<"CommMPI: No Message Received to Match Request"<<endl);
00435 }
00436
00437
00438 return newmsg;
00439 }
00440
00441
00443
00444
00445
00446 void CommMPI::mybarrier(void) {
00447 TAU_PROFILE("CommMPI::mybarrier()", "void ()", TAU_MESSAGE);
00448
00449 MPI_Barrier(communicator);
00450 }
00451
00452
00454
00455
00456 bool CommMPI::resend(void *buf, int buffsize, int node, int etag) {
00457 TAU_PROFILE("CommMPI::resend()", "void (void *, int, int, int)",
00458 TAU_MESSAGE);
00459
00460
00461
00462
00463
00464
00465 PInsist(node != myNode(), "Can only retransmit to other nodes");
00466
00467
00468 MPI_Request request;
00469 int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
00470 communicator, &request);
00471 TAU_TRACE_SENDMSG(etag, node, size);
00472
00473 int flag = false;
00474 MPI_Status status;
00475 while (!flag) {
00476
00477 MPI_Test(&request, &flag, &status);
00478 }
00479
00480
00481 return (errstat == 0);
00482 }
00483
00484
00486
00487 void CommMPI::cleanupMessage(void *d) {
00488 TAU_PROFILE("CommMPI::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00489
00490
00491 freebuffer(d);
00492 }
00493
00494
00495
00496
00497
00498
00499