00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "Message/CommPM.h"
00028 #include "Message/Message.h"
00029 #include "Utility/IpplInfo.h"
00030 #include "Profile/Profiler.h"
00031
00032
00033 #include <mpi.h>
00034
00035
00036
00037
00038
00039
00040 typedef void* pmCtx;
00041 extern "C" {
00042 int _pmGetSendBuf(pmCtx pmc, caddr_t *bufp, size_t length);
00043 int _pmSend(pmCtx pmc, int dst_node);
00044 int _pmSendDone(pmCtx pmc);
00045 int _pmReceive(pmCtx pmc, caddr_t *bufp, size_t *length);
00046 int _pmPutReceiveBuf(pmCtx pmc);
00047 extern pmCtx _pm_subnet[];
00048 extern int _pm_subnet_count;
00049 };
00050 #define PM_MTU (8192 + 32)
00051
00052
00053 #include <score.h>
00054
00055 #include <string.h>
00056 #include <stdlib.h>
00057 #include <unistd.h>
00058
00059
00060
00061
00062 #define MAX_MPI_ERRS 500
00063
00064
00065
00066 static int size_of_MPI_INT;
00067
00068
00069 static pmCtx pm_network;
00070
00072
00073
00074
00075
00076
00077 CommPM::CommPM(int& argc , char**& argv, int procs)
00078 : Communicate(argc, argv, procs) {
00079 TAU_PROFILE("CommPM::CommPM()", "void (int, char **, int)", TAU_MESSAGE);
00080 int i, reported, rep_host, ierror, result_len;
00081 MPI_Status stat;
00082 char *currtok, *nexttok, *execname;
00083
00084
00085 currtok = strstr(argv[0],"/");
00086 if (!currtok) {
00087 execname = strdup(argv[0]);
00088 } else {
00089 currtok++;
00090 nexttok = strstr(currtok,"/");
00091 while (nexttok) {
00092 currtok = nexttok+1;
00093 nexttok = strstr(currtok,"/");
00094 }
00095 execname = strdup(currtok);
00096 }
00097
00098
00099 if (_score_alloc_subnet()) {
00100 ERRORMSG("CommPM: Error with allocating sub-network.");
00101 }
00102 pm_network = _pm_subnet[_pm_subnet_count - 1];
00103
00104
00105 MPI_Init(&argc, &argv);
00106
00107
00108 strcpy(argv[0],execname);
00109
00110
00111 MPI_Comm_size(MPI_COMM_WORLD,&TotalNodes);
00112 MPI_Comm_rank(MPI_COMM_WORLD,&myHost);
00113
00114
00115 if (procs > 0 && procs < TotalNodes) {
00116
00117 if (myHost >= procs)
00118 Ippl::abort();
00119 TotalNodes = procs;
00120 }
00121
00122 MPI_Type_size ( MPI_INT, (MPI_Aint*)&size_of_MPI_INT );
00123 if (myHost == 0) {
00124
00125 for (i = 1; i < TotalNodes; i++)
00126 {
00127 MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00128 TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, i, 1*size_of_MPI_INT);
00129 }
00130
00131
00132 int *child_ready = new int[TotalNodes];
00133 for (i = 0; i < TotalNodes; child_ready[i++] = 0);
00134 INFOMSG("CommPM: Parent process waiting for children ..." << endl);
00135 reported = 1;
00136 while (reported < TotalNodes) {
00137 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
00138 COMM_HOSTS_TAG, MPI_COMM_WORLD, &stat);
00139 TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, stat.MPI_SOURCE, 1*size_of_MPI_INT);
00140 if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host])) {
00141 child_ready[rep_host] = 1;
00142 reported++;
00143 INFOMSG("CommPM: Child " << rep_host << " ready." << endl);
00144 }
00145 else {
00146 ERRORMSG("CommPM: Error with child reporting to parent. ");
00147 ERRORMSG("rep_host = " << rep_host);
00148 ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00149 }
00150 }
00151
00152 delete [] child_ready;
00153 INFOMSG("CommPM: Initialization complete." << endl);
00154
00155 }
00156 else {
00157 char host_name[MPI_MAX_PROCESSOR_NAME];
00158 ierror = MPI_Get_processor_name(host_name, &result_len);
00159 if (ierror >= 0) {
00160 INFOMSG("CommPM: Started job " << myHost << " on host `");
00161 INFOMSG(host_name << "'." << endl);
00162 }
00163 else {
00164 ERRORMSG("CommPM: failed" << endl);
00165 }
00166
00167
00168 int checknode;
00169 MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD,
00170 &stat);
00171 TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00172 if (checknode != 0)
00173 WARNMSG("CommPM: Child received bad message during startup." << endl);
00174
00175
00176 MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00177 TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, 0, 1*size_of_MPI_INT);
00178 }
00179
00180
00181 if (TotalNodes > 1) {
00182 vector<int> proccount;
00183 proccount.push_back(1);
00184 for (i = 1; i < TotalNodes; i++) {
00185 Contexts.push_back(1);
00186 Processes.push_back(proccount);
00187 }
00188 }
00189
00190 }
00191
00192
00194
00195 CommPM::~CommPM(void) {
00196 TAU_PROFILE("CommPM::~CommPM()", "void()", TAU_MESSAGE);
00197 int i, dieCode = 0;
00198 MPI_Status stat;
00199
00200
00201
00202 if (TotalNodes > 1) {
00203 int trial, node, tag;
00204 Message *msg;
00205 for (trial = 0; trial < 50000; ++trial) {
00206 do {
00207 node = COMM_ANY_NODE;
00208 tag = COMM_ANY_TAG;
00209 msg = myreceive(node, tag, COMM_SEND_TAG);
00210 if (msg != 0) {
00211 WARNMSG("CommPM: Found extra message from node " << node);
00212 WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00213 }
00214 } while (msg != 0);
00215 }
00216 }
00217
00218
00219 if (myNode() == 0) {
00220
00221 for (i = 1; i < TotalNodes; i++)
00222 {
00223 MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, MPI_COMM_WORLD);
00224 TAU_TRACE_SENDMSG(COMM_DIE_TAG, i, 1*size_of_MPI_INT);
00225 }
00226 } else {
00227
00228 MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
00229 TAU_TRACE_RECVMSG(COMM_DIE_TAG, 0, 1*size_of_MPI_INT);
00230 }
00231
00232
00233 _score_free_subnet();
00234
00235 MPI_Finalize();
00236 }
00237
00238
00239 struct PM_Message {
00240 int node;
00241 int tag;
00242 };
00243
00244 #if 0
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256 void *CommPM::pack_message(Message *msg, int tag, int &buffsize) {
00257 TAU_PROFILE("CommPM::pack_message()", "(Message *, int, int)",
00258 TAU_MESSAGE);
00259
00260 buffsize = find_msg_length(*msg);
00261
00262
00263 void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : mpipackbuf;
00264
00265
00266 fill_msg_buffer(pos, *msg, tag);
00267 return pos;
00268 }
00269 #endif
00270
00272
00273
00274
00275
00276
00277
00278 bool CommPM::mysend(Message *msg, int node, int tag, int etag) {
00279 TAU_PROFILE("CommPM::mysend()", "bool (Message *, int, int, int)",
00280 TAU_MESSAGE);
00281
00282 int nerr = 0;
00283
00284
00285
00286
00287 int length;
00288 length = find_msg_length(*msg) + sizeof(PM_Message);
00289
00290
00291
00292
00293 PM_Message* msgbuf;
00294 int timeout_counter = 0;
00295 while (_pmGetSendBuf(pm_network, (caddr_t*)&msgbuf, length) == ENOBUFS) {
00296 timeout_counter++;
00297 if (timeout_counter > 10000000) {
00298 ERRORMSG("CommPM: _pmGetSendBuf TIMEOUT");
00299 timeout_counter = 0;
00300 nerr++;
00301 }
00302
00303 nerr += pickup_message();
00304 }
00305
00306
00307
00308 msgbuf->node = myHost;
00309 msgbuf->tag = tag;
00310
00311 fill_msg_buffer((void*) (&(msgbuf->tag) + 1), *msg, tag);
00312
00313
00314
00315 if (_pmSend(pm_network, node)) {
00316 ERRORMSG("CommPM: _pmSend Error");
00317 nerr++;
00318 }
00319
00320 while (_pmSendDone(pm_network) == EBUSY) {
00321 nerr += pickup_message();
00322 }
00323
00324
00325
00326 return (nerr == 0);
00327 }
00328
00329 int CommPM::pickup_message(void) {
00330 int nerr = 0;
00331 int length;
00332 PM_Message* msgbuf;
00333 int error;
00334 Message* newmsg = 0;
00335
00336 int src_node, rec_size, rec_utag;
00337 void* rec_buff;
00338
00339
00340 if (error = _pmReceive(pm_network, (caddr_t*)&msgbuf, (size_t*)&length)) {
00341
00342 if (error != ENOBUFS) {
00343 ERRORMSG("CommPM: _pmReceive Error (in pickup_message)");
00344 nerr++;
00345 }
00346 } else {
00347
00348 TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00349 src_node = msgbuf->node;
00350
00351 rec_size = length - sizeof(PM_Message);
00352 rec_buff = makebuffer(rec_size);
00353
00354 memcpy(rec_buff, (void*) (&(msgbuf->tag) + 1), rec_size);
00355 _pmPutReceiveBuf(pm_network);
00356 newmsg = unpack_message(src_node, rec_utag, rec_buff);
00357 newmsg->useCommunicate(this, rec_buff);
00358
00359
00360 if (add_msg(newmsg, src_node, rec_utag)) {
00361 newmsg = NULL;
00362 }
00363 }
00364
00365
00366 return nerr;
00367 }
00368
00369
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381 Message *CommPM::myreceive(int& node, int& tag, int etag) {
00382 TAU_PROFILE("CommPM::myreceive()", "Message *(int, int, int)", TAU_MESSAGE);
00383 int error;
00384 PM_Message* msgbuf;
00385 int length;
00386 int rec_tag, src_node, rec_size, rec_utag;
00387 void* rec_buff;
00388 Message* newmsg = 0;
00389
00390
00391 if (error = _pmReceive(pm_network, (caddr_t*)&msgbuf, (size_t*)&length)) {
00392
00393 if (error != ENOBUFS) {
00394 ERRORMSG("CommPM: _pmReceive Error (in myreceive)");
00395 } else {
00396
00397 DEBUGMSG(level2<<"CommPM: No Message Received to Match Request"<<endl);
00398 }
00399 } else {
00400
00401 TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00402 src_node = msgbuf->node;
00403 rec_tag = msgbuf->tag;
00404 rec_size = length - sizeof(PM_Message);
00405 rec_buff = makebuffer(rec_size);
00406
00407 memcpy(rec_buff, (void*) (&(msgbuf->tag) + 1), rec_size);
00408 _pmPutReceiveBuf(pm_network);
00409 newmsg = unpack_message(src_node, rec_utag, rec_buff);
00410 newmsg->useCommunicate(this, rec_buff);
00411 }
00412
00413
00414 node = src_node;
00415 tag = rec_tag;
00416 return newmsg;
00417 }
00418
00419
00421
00422
00423
00424 void CommPM::mybarrier(void) {
00425 TAU_PROFILE("CommPM::mybarrier()", "void ()", TAU_MESSAGE);
00426
00427 MPI_Barrier(MPI_COMM_WORLD);
00428 }
00429
00430
00432
00433 void CommPM::cleanupMessage(void *d) {
00434 TAU_PROFILE("CommPM::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00435
00436
00437 freebuffer(d);
00438 }
00439
00440
00441
00442
00443
00444
00445