00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "Message/CommPVM.h"
00034 #include "Message/Message.h"
00035 #include "Utility/IpplInfo.h"
00036 #include "Profile/Profiler.h"
00037
00038
00039 #include <pvm3.h>
00040
00041 #include <string.h>
00042 #include <stdlib.h>
00043 #include <unistd.h>
00044
00045
00046
00047
00048
00049 #define MAX_PVM_ERRS 500
00050
00051
00052
00053 typedef char *charptr;
00054
00055
00056 static int numErrors = 0;
00057
00058
00060
00061
00062
00063
00064
00065
00066 CommPVM::CommPVM(int argc , char** argv, int procs)
00067 : Communicate(argc, argv, procs) {
00068 TAU_PROFILE("CommPVM::CommPVM()", "void (int, char **, int)", TAU_MESSAGE);
00069 charptr *useargs = argv;
00070
00071
00072 int i, startNum, mytid, parent_tid, testnumhosts, added;
00073 int *child_ready, reported, rep_host, rep_tid;
00074 charptr *spawnargv = 0;
00075
00076
00077 execName = NULL;
00078 tids = NULL;
00079
00080
00081 int ErrRet = pvm_joingroup("all_tasks");
00082 if (ErrRet==PvmSysErr) {
00083 ERRORMSG("CommPVM: pvmd not running!!" << endl);
00084 }
00085 else if (ErrRet==PvmBadParam) {
00086 ERRORMSG("CommPVM: bad group name!!" << endl);
00087 }
00088 else if (ErrRet==PvmDupGroup) {
00089 ERRORMSG("CommPVM: already joined this group!!" << endl);
00090 }
00091
00092
00093 mytid = pvm_mytid();
00094 if(mytid >= 0) {
00095
00096
00097
00098 struct pvmhostinfo *hostp;
00099 struct pvmtaskinfo *taskp;
00100 parent_tid = pvm_parent();
00101 pvm_setopt(PvmRoute, PvmRouteDirect);
00102
00103
00104
00105
00106 int narch;
00107 if(pvm_config(&TotalNodes, &narch, &hostp) != 0) {
00108
00109 WARNMSG("CommPVM: cannot get virtual machine configuration.\n");
00110 WARNMSG(" Going to single-node status." << endl);
00111 TotalNodes = 1;
00112 parent_tid = (-1);
00113 }
00114
00115
00116 char *exnmptr = strrchr(useargs[0],'/');
00117 if(!exnmptr)
00118 exnmptr = useargs[0];
00119 else
00120 exnmptr++;
00121 execName = new char[strlen(exnmptr) + 1];
00122 strcpy(execName,exnmptr);
00123
00124
00125
00126
00127
00128
00129 if(procs > 0)
00130 startNum = procs;
00131 else
00132 startNum = TotalNodes;
00133
00134
00135 if(startNum == 1)
00136 TotalNodes = 1;
00137
00138
00139 tids = new int[startNum];
00140 if (parent_tid < 0) {
00141
00142 tids[0] = mytid;
00143 added = 1;
00144 myHost = 0;
00145 if(startNum > 1) {
00146
00147
00148
00149
00150 if (argc > 1) {
00151 spawnargv = new charptr[argc];
00152 for(i=0; i < (argc - 1); i++)
00153 spawnargv[i] = useargs[i+1];
00154 spawnargv[argc - 1] = NULL;
00155 }
00156
00157
00158
00159 int ntask;
00160 pvm_tasks(0, &ntask, &taskp);
00161 int pvmd_tid = (taskp[0]).ti_host;
00162 int checked_host_once = 0;
00163
00164
00165 int pn;
00166 for (i=0, pn=0; pn < startNum && added < startNum; pn++) {
00167 if (checked_host_once == 0 && pvmd_tid == (hostp[i]).hi_tid) {
00168 checked_host_once = 1;
00169 }
00170 else {
00171 INFOMSG("CommPVM: Starting job " << added + 1 << " on host '");
00172 INFOMSG((hostp[i]).hi_name << "' ... ");
00173 pvm_spawn(execName, spawnargv, PvmTaskHost,
00174 (hostp[i]).hi_name, 1, tids + added);
00175 if(tids[added] < 0) {
00176 ERRORMSG("failed. PVM Error = " << tids[added] << endl);
00177 }
00178 else {
00179 INFOMSG("new tid = " << tids[added] << endl);
00180 added++;
00181 }
00182 }
00183 i = ((i + 1) % TotalNodes);
00184 }
00185
00186
00187 delete [] spawnargv;
00188
00189
00190 TotalNodes = startNum = added;
00191
00192 pvm_initsend(PvmDataRaw);
00193 pvm_pkint(&TotalNodes,1,1);
00194 pvm_pkint(tids,TotalNodes,1);
00195 pvm_mcast(tids+1,TotalNodes-1,COMM_HOSTS_TAG);
00196
00197 pvm_initsend(PvmDataRaw);
00198
00199
00200 INFOMSG("CommPVM: Parent process waiting for children ..." << endl);
00201 child_ready = new int[TotalNodes];
00202 for(i=0; i < TotalNodes; child_ready[i++] = 0);
00203 reported = 1;
00204 while(reported < TotalNodes) {
00205 if(pvm_nrecv((-1),COMM_HOSTS_TAG) > 0) {
00206 pvm_upkint(&rep_host,1,1);
00207 pvm_upkint(&rep_tid,1,1);
00208 if(rep_host>=0 &&rep_host<TotalNodes &&!(child_ready[rep_host])) {
00209 child_ready[rep_host] = 1;
00210 reported++;
00211 INFOMSG("CommPVM: Child " << rep_host << " (tid = " << rep_tid);
00212 INFOMSG(") ready." << endl);
00213 }
00214 else {
00215 ERRORMSG("CommPVM: Error with child reporting to parent. ");
00216 ERRORMSG("rep_host = " << rep_host << ", rep_tid = " << rep_tid);
00217 ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00218 }
00219 }
00220 }
00221 delete [] child_ready;
00222 INFOMSG("CommPVM: Initialization complete!" << endl);
00223 }
00224
00225 }
00226 else {
00227
00228 pvm_recv(parent_tid, COMM_HOSTS_TAG);
00229 pvm_upkint(&testnumhosts,1,1);
00230 if(testnumhosts != startNum) {
00231 WARNMSG("CommPVM: For tid " << mytid);
00232 WARNMSG(": Adjusting # of nodes from " << startNum << " to ");
00233 WARNMSG(testnumhosts << endl);
00234 startNum = testnumhosts;
00235 }
00236 pvm_upkint(tids, startNum, 1);
00237 TotalNodes = startNum;
00238
00239
00240 for(i=1; i < startNum; i++)
00241 if(mytid == tids[i]) { myHost = i; break; }
00242
00243
00244 if(i >= startNum) {
00245
00246 pvm_exit();
00247 }
00248
00249
00250 pvm_initsend(PvmDataRaw);
00251 pvm_pkint(&myHost,1,1);
00252 pvm_pkint(&mytid,1,1);
00253 pvm_send(parent_tid,COMM_HOSTS_TAG);
00254 }
00255
00256 }
00257 else {
00258 TotalNodes = 1;
00259 WARNMSG("CommPVM: PVM not running, going to single-node mode." << endl);
00260 }
00261
00262
00263 if (TotalNodes > 1) {
00264 vector<int> proccount;
00265 proccount.push_back(1);
00266 for (i = 1; i < TotalNodes; i++) {
00267 Contexts.push_back(1);
00268 Processes.push_back(proccount);
00269 }
00270 }
00271 }
00272
00273
00275
00276 CommPVM::~CommPVM(void) {
00277 TAU_PROFILE("CommPVM::~CommPVM()", "void ()", TAU_MESSAGE);
00278
00279 int i;
00280
00281
00282
00283 if (TotalNodes > 1) {
00284 int trial, node, tag;
00285 Message *msg;
00286 for (trial = 0; trial < 50000; ++trial) {
00287 do {
00288 node = COMM_ANY_NODE;
00289 tag = COMM_ANY_TAG;
00290 msg = myreceive(node, tag, COMM_SEND_TAG);
00291 if (msg != 0) {
00292 WARNMSG("CommPVM: Found extra message from node " << node);
00293 WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00294 }
00295 } while (msg != 0);
00296 }
00297 }
00298
00299
00300 if (TotalNodes > 1) {
00301
00302 mybarrier();
00303
00304
00305 if (myHost == 0) {
00306 INFOMSG("CommPVM: Parent process killing spawned processes ..." << endl);
00307 for (i = 1 ; i < TotalNodes; i++)
00308 pvm_kill(tids[i]);
00309
00310 pvm_exit();
00311 if(tids)
00312 delete [] tids;
00313 if(execName)
00314 delete [] execName;
00315
00316 } else {
00317
00318 if(tids)
00319 delete [] tids;
00320 if(execName)
00321 delete [] execName;
00322
00323 while(1)
00324 sleep(1);
00325 }
00326 }
00327 }
00328
00329
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341 void CommPVM::pack_message(Message *msg, int tag) {
00342 TAU_PROFILE("CommPVM::pack_message()", "void (Message *, int)", TAU_MESSAGE);
00343 int mdata[3];
00344 int nitems = msg->size();
00345
00346
00347 mdata[0] = tag;
00348 mdata[1] = myNode();
00349 mdata[2] = nitems;
00350 pvm_pkint(mdata, 3, 1);
00351
00352
00353 Message::iterator mbeg = msg->begin();
00354 Message::iterator mend = msg->end();
00355 for ( ; mbeg != mend; ++mbeg) {
00356 Message::MsgItem *msg = *mbeg;
00357 mdata[0] = msg->elemsize;
00358 mdata[1] = msg->bytesize;
00359 pvm_pkint(mdata, 2, 1);
00360 if (mdata[1] > 0)
00361 pvm_pkbyte((char *)(msg->item), mdata[1], 1);
00362 }
00363 }
00364
00365
00367
00368
00369
00370
00371 Message* CommPVM::unpack_message(int &node, int &tag) {
00372 TAU_PROFILE("CommPVM::unpack_message()", "Message * (int, int)", TAU_MESSAGE);
00373 int mdata[3];
00374 int j, nitems;
00375
00376
00377 pvm_upkint(mdata, 3, 1);
00378 tag = mdata[0];
00379 node = mdata[1];
00380 nitems = mdata[2];
00381
00382
00383 Message *newmsg = new Message;
00384
00385
00386 for (j = 0; j < nitems; j++) {
00387 pvm_upkint(mdata, 2, 1);
00388 int elemsize = mdata[0];
00389 int bytesize = mdata[1];
00390
00391
00392
00393
00394 if ( bytesize > 0 && elemsize > 0 ) {
00395
00396
00397 char* itemdata = (char*) malloc(bytesize);
00398 pvm_upkbyte(itemdata, bytesize, 1);
00399 newmsg->setCopy(false);
00400 newmsg->setDelete(true);
00401 newmsg->putmsg((void *)itemdata, elemsize, bytesize / elemsize );
00402 }
00403 }
00404
00405
00406 return newmsg;
00407 }
00408
00409
00411
00412
00413
00414
00415
00416
00417 bool CommPVM::mysend(Message *msg, int node, int tag, int etag) {
00418 TAU_PROFILE("CommPVM::mysend()", "bool (Message *, int, int, int)", TAU_MESSAGE);
00419 int newbufid, errstat = (-1);
00420
00421
00422 newbufid = pvm_initsend(PvmDataRaw);
00423 if ( newbufid >= 0 ) {
00424
00425 pack_message(msg, tag);
00426
00427
00428 errstat = pvm_send(tids[node], etag);
00429 }
00430 else {
00431 ERRORMSG("CommPVM: Error in pvm_initsend = " << newbufid << endl);
00432 return false;
00433 }
00434
00435
00436 return (errstat == 0);
00437 }
00438
00439
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451 Message *CommPVM::myreceive(int& node, int& tag, int etag) {
00452 TAU_PROFILE("CommPVM::myreceive()", "Message * (int, int, int)", TAU_MESSAGE);
00453 int bufid, size, checknode, checktag;
00454 Message *newmsg = NULL;
00455
00456 checknode = (node < 0 || node >= TotalNodes ? (-1) : tids[node]);
00457 checktag = etag;
00458
00459
00460 if ((bufid = pvm_getrbuf()) > 0)
00461 pvm_freebuf(bufid);
00462
00463
00464 bufid = pvm_probe(checknode, checktag);
00465
00466
00467 if (bufid > 0) {
00468
00469
00470 if (pvm_bufinfo(bufid, &size, &checktag, &checknode) < 0) {
00471 ERRORMSG("CommPVM: myreceive shows msg, but ");
00472 ERRORMSG("cannot get info about this buffer." << endl);
00473 ERRORMSG(" id = " << bufid << endl);
00474 }
00475 else if (size <= 0) {
00476 ERRORMSG("CommPVM: received message has size " << size << endl);
00477 }
00478 else {
00479
00480
00481 bufid = pvm_recv(checknode, checktag);
00482 newmsg = unpack_message(node, tag);
00483 pvm_freebuf(bufid);
00484 numErrors = 0;
00485 }
00486
00487 }
00488 else if (bufid < 0) {
00489
00490 ERRORMSG("CommPVM: myreceive cannot receive msg from node " << node);
00491 ERRORMSG(", tag " << tag << endl);
00492
00493 if (MAX_PVM_ERRS > 0 && ++numErrors > MAX_PVM_ERRS) {
00494 ERRORMSG("Maximum number of PVM receive errors (" << numErrors);
00495 ERRORMSG(") exceeded. PVM is hosed!!" << endl);
00496 exit(1);
00497 }
00498 }
00499
00500
00501 return newmsg;
00502 }
00503
00504
00506
00507
00508
00509 void CommPVM::mybarrier(void) {
00510 TAU_PROFILE("CommPVM::mybarrier()", "void ()", TAU_MESSAGE);
00511 pvm_barrier("all_tasks", TotalNodes);
00512 return;
00513 }
00514
00515
00516
00517
00518