00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00027
00028
00029
00030
00032
00033
00034 #include "Message/Communicate.h"
00035 #include "Message/Message.h"
00036 #include "Message/CRC.h"
00037 #include "PETE/IpplExpressions.h"
00038 #include "Profile/Profiler.h"
00039 #include "Utility/IpplInfo.h"
00040 #include "Utility/IpplStats.h"
00041 #include "Utility/RandomNumberGen.h"
00042 #include "Utility/PAssert.h"
00043 #include <stdio.h>
00044
00045
00047
00048 ostream& operator<<(ostream& o, const Communicate& c)
00049 {
00050 TAU_PROFILE("operator<<()", "ostream (ostream, Communicate)",
00051 TAU_MESSAGE | TAU_IO);
00052
00053 o << "Parallel communication method: " << c.name() << "\n";
00054 o << " Total nodes: " << c.getNodes() << ", Current node: ";
00055 o << c.myNode() << "\n";
00056 o << " Queued received messages: ";
00057 o << c.getReceived() <<"\n";
00058
00059 return o;
00060 }
00061
00062
00064
00065
00066
00067
00068
00069
00070
00071
00072 Communicate::Communicate(int, char **, int)
00073 : nextMsgNum(1)
00074 {
00075 TAU_PROFILE("Communicate::Communicate()", "void (int, char **, int)",
00076 TAU_MESSAGE);
00077
00078
00079 TotalNodes = 1;
00080 myHost = 0;
00081 ErrorStatus = COMM_NOERROR;
00082 Contexts.push_back(1);
00083 Processes.push_back(Contexts);
00084 }
00085
00086
00088
00089 Communicate::~Communicate(void)
00090 {
00091 TAU_PROFILE("Communicate::~Communicate()", "void ()", TAU_MESSAGE);
00092
00093
00094 SentCache_t::iterator cachei = sentMsgCache.begin();
00095 for ( ; cachei != sentMsgCache.end(); ++cachei)
00096 (*cachei).second.freebuf();
00097 }
00098
00099
00101
00102 bool Communicate::add_msg(Message *msg, int node, int tag)
00103 {
00104 TAU_PROFILE("Communicate::add_msg()", "bool (Message *, int, int)",
00105 TAU_MESSAGE);
00106
00107 recMsgList.push_back(MessageData(node, tag, msg));
00108 return true;
00109 }
00110
00111
00113
00114
00115
00116
00117
00118 Message* Communicate::find_msg(int& node, int& tag)
00119 {
00120 TAU_PROFILE("Communicate::find_msg()", "Message* (int&, int&)",
00121 TAU_MESSAGE);
00122
00123
00124 vector<MessageData>::iterator qi = recMsgList.begin();
00125 vector<MessageData>::iterator qend = recMsgList.end();
00126 for ( ; qi != qend ; ++qi) {
00127 if ((node == COMM_ANY_NODE || (*qi).node == node) &&
00128 (tag == COMM_ANY_TAG || (*qi).tag == tag)) {
00129 node = (*qi).node;
00130 tag = (*qi).tag;
00131 Message *retval = (*qi).msg;
00132 recMsgList.erase(qi);
00133
00134
00135 return retval;
00136 }
00137 }
00138
00139
00140 return 0;
00141 }
00142
00143
00145
00146 bool Communicate::mysend(Message *, int, int, int)
00147 {
00148 TAU_PROFILE("Communicate::mysend()", "bool (Message *, int, int, int)",
00149 TAU_MESSAGE);
00150
00151
00152 return false;
00153 }
00154
00155
00157
00158 Message* Communicate::myreceive(int&, int&, int)
00159 {
00160 TAU_PROFILE("Communicate::myreceive()", "Message* (int, int, int)",
00161 TAU_MESSAGE);
00162
00163
00164 return 0;
00165 }
00166
00167
00169
00170 void Communicate::mybarrier(void)
00171 {
00172 TAU_PROFILE("Communicate::mybarrier()", "void ()", TAU_MESSAGE);
00173
00174
00175 return;
00176 }
00177
00178
00180
00181
00182 bool Communicate::resend(void *, int, int, int)
00183 {
00184 TAU_PROFILE("Communicate::resend()", "void (void *, int, int, int)",
00185 TAU_MESSAGE);
00186
00187
00188 return false;
00189 }
00190
00191
00193
00194
00195 bool Communicate::send(Message *msg, int node, int tag, bool delmsg)
00196 {
00197 TAU_PROFILE("Communicate::send()", "bool (Message *, int, int, bool)",
00198 TAU_MESSAGE);
00199
00200 bool retval;
00201
00202
00203
00204
00205
00206 if ( node < 0 || node >= getNodes() || tag < 0 ) {
00207 ERRORMSG("Communicate: illegal send node " << node << endl);
00208 ErrorStatus = COMM_ERROR;
00209 return false;
00210 }
00211
00212
00213
00214 if ( node == myNode() ) {
00215 retval = add_msg(msg, node, tag);
00216
00217
00218 } else {
00219
00220 retval = mysend(msg, node, tag, COMM_SEND_TAG);
00221
00222
00223 if (retval) {
00224
00225
00226 if (delmsg)
00227 delete msg;
00228 }
00229 }
00230
00231
00232 ErrorStatus = (retval != 0 ? COMM_NOERROR : COMM_NOSEND);
00233
00234
00235 return retval;
00236 }
00237
00238
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252 Message* Communicate::receive(int& node, int& tag)
00253 {
00254 TAU_PROFILE("Communicate::receive()", "Message* (int, int)", TAU_MESSAGE);
00255
00256
00257
00258
00259
00260 process_resend_requests();
00261
00262
00263
00264 Message *msg = find_msg(node, tag);
00265
00266
00267 if (msg == 0 && myNode() != node) {
00268 int checknode = node;
00269 int checktag = tag;
00270
00271
00272 if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0) {
00273
00274
00275
00276 if ((node != COMM_ANY_NODE && node != checknode) ||
00277 (tag != COMM_ANY_TAG && tag != checktag )) {
00278
00279
00280 add_msg(msg, checknode, checktag);
00281 msg = 0;
00282 } else {
00283
00284
00285 node = checknode;
00286 tag = checktag;
00287
00288
00289 }
00290 } else {
00291
00292 }
00293 }
00294
00295
00296 ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
00297
00298
00299 return msg;
00300 }
00301
00302
00304
00305 Message *Communicate::receive_block(int& node, int &tag)
00306 {
00307 TAU_TYPE_STRING(taustr, "Message *(int, int)" );
00308 TAU_PROFILE("Communicate::receive_block()", taustr, TAU_MESSAGE);
00309 TAU_PROFILE_TIMER(findtimer, " Comm::recblock::find", taustr, TAU_MESSAGE);
00310 TAU_PROFILE_TIMER(looptimer, " Comm::recblock::loop", taustr, TAU_MESSAGE);
00311
00312
00313 process_resend_requests();
00314
00315
00316
00317
00318
00319
00320
00321 TAU_PROFILE_START(findtimer);
00322 Message *msg = find_msg(node, tag);
00323 TAU_PROFILE_STOP(findtimer);
00324
00325
00326
00327 TAU_PROFILE_START(looptimer);
00328 if (myNode() != node) {
00329 while (msg == 0) {
00330
00331 process_resend_requests();
00332
00333 int checknode = node;
00334 int checktag = tag;
00335
00336
00337 if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0) {
00338
00339
00340
00341 if ((node != COMM_ANY_NODE && node != checknode) ||
00342 (tag != COMM_ANY_TAG && tag != checktag )) {
00343
00344
00345 add_msg(msg, checknode, checktag);
00346 msg = 0;
00347 } else {
00348
00349
00350 node = checknode;
00351 tag = checktag;
00352
00353
00354 }
00355 } else {
00356
00357 if (Ippl::retransmit())
00358 msg = find_msg(node, tag);
00359 }
00360 }
00361 }
00362 TAU_PROFILE_STOP(looptimer);
00363
00364
00365
00366 PInsist(!(myNode() == node && msg == 0),
00367 "Local message not found in Communicate::receive_block!!");
00368
00369
00370 ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
00371
00372
00373 return msg;
00374 }
00375
00376
00378
00379
00380
00381 int Communicate::broadcast_all(Message *msg, int tag)
00382 {
00383 TAU_PROFILE("Communicate::broadcast_all()", "int (Message *, int)",
00384 TAU_MESSAGE);
00385 int i;
00386
00387
00388 for (i=(getNodes() - 1); i >= 0; i--) {
00389 if (i != myNode()) {
00390 mysend(msg, i, tag, COMM_SEND_TAG);
00391
00392
00393 }
00394 }
00395
00396
00397 add_msg(msg, myNode(), tag);
00398
00399
00400
00401 return getNodes();
00402 }
00403
00404
00406
00407
00408
00409
00410 int Communicate::broadcast_others(Message *msg, int tag, bool delmsg)
00411 {
00412 TAU_PROFILE("Communicate::broadcast_others()", "int (Message *, int, bool)",
00413 TAU_MESSAGE);
00414 int i;
00415
00416
00417 for (i=(getNodes() - 1); i >= 0; i--) {
00418 if (i != myNode()) {
00419 mysend(msg, i, tag, COMM_SEND_TAG);
00420
00421
00422 }
00423 }
00424
00425
00426 if (delmsg)
00427 delete msg;
00428
00429 return getNodes() - 1;
00430 }
00431
00432
00434
00435
00436 void Communicate::barrier()
00437 {
00438 TAU_PROFILE("Communicate::barrier()", "void ()", TAU_MESSAGE);
00439
00440 mybarrier();
00441
00442 }
00443
00444
00446
00447
00448 void Communicate::cleanupMessage(void *) { }
00449
00450
00452
00453 int Communicate::find_msg_length(Message &msg)
00454 {
00455 TAU_PROFILE("Communicate::find_msg_length()", "int (Message)",
00456 TAU_MESSAGE);
00457
00458 static const int longsize = wordround(sizeof(MsgNum_t));
00459 static const int intsize4 = wordround(4 * sizeof(int));
00460 static const int intsize2 = wordround(2 * sizeof(int));
00461
00462
00463
00464 int buffsize = longsize + intsize4;
00465
00466
00467
00468 int nitems = msg.size();
00469 for (int i=0; i < nitems; ++i)
00470 buffsize += (intsize2 + wordround(msg.item(i).numBytes()));
00471
00472
00473 if (Ippl::useChecksums())
00474 buffsize += sizeof(CRCTYPE);
00475
00476 return buffsize;
00477 }
00478
00479
00481
00482 void Communicate::fill_msg_buffer(void *buffer, Message &msg, int tag,
00483 int bufsize, int node)
00484 {
00485 TAU_PROFILE("Communicate::fill_msg_buffer()",
00486 "void (void *, Message, int, int, int)", TAU_MESSAGE);
00487
00488 void *pos = buffer;
00489 int nitems = msg.size();
00490 int mdata[4];
00491 MsgNum_t mnum = (nextMsgNum++);
00492
00493
00494
00495
00496
00497
00498
00499
00500 if (Ippl::useChecksums())
00501 memset(pos, 0, bufsize);
00502
00503
00504 pack(&mnum, pos, sizeof(MsgNum_t));
00505
00506
00507 mdata[0] = tag;
00508 mdata[1] = myNode();
00509 mdata[2] = nitems;
00510 mdata[3] = bufsize;
00511 pack(mdata, pos, 4*sizeof(int));
00512
00513
00514 for (int i=0; i < nitems; ++i) {
00515 Message::MsgItem &msgitem = msg.item(i);
00516 mdata[0] = msgitem.numElems();
00517 mdata[1] = msgitem.numBytes();
00518 pack(mdata, pos, 2*sizeof(int));
00519 if (mdata[1] > 0)
00520 pack(msgitem.data(), pos, mdata[1]);
00521 }
00522
00523
00524 if (Ippl::useChecksums()) {
00525
00526 int crcsize = bufsize - sizeof(CRCTYPE);
00527 CRCTYPE crcval = crc(buffer, crcsize);
00528
00529
00530 *(static_cast<CRCTYPE *>(pos)) = crcval;
00531
00532
00533 if (Ippl::retransmit()) {
00534 if (tag != IPPL_RETRANSMIT_TAG && tag != IPPL_MSG_OK_TAG) {
00535
00536
00537 add_to_send_cache(buffer, mnum, bufsize, node);
00538 } else {
00539
00540 }
00541 }
00542 }
00543
00544 ADDIPPLSTAT(incMessageBytesSent,bufsize);
00545 }
00546
00547
00549
00550 Message* Communicate::unpack_message(int &node, int &tag, void *buffer)
00551 {
00552 TAU_PROFILE("Communicate::unpack_message()",
00553 "Message* (int, int, long, void*)", TAU_MESSAGE);
00554
00555 Message *newmsg = 0;
00556
00557
00558 void *pos = buffer;
00559
00560
00561 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
00562 addwordround(pos, sizeof(MsgNum_t));
00563
00564
00565 int *mdata = static_cast<int *>(pos);
00566 tag = mdata[0];
00567 node = mdata[1];
00568 int nitems = mdata[2];
00569 int bufsize = mdata[3];
00570 addwordround(pos, 4*sizeof(int));
00571
00572
00573
00574
00575
00576 if (tag == IPPL_ABORT_TAG) {
00577 ERRORMSG("Stopping due to abort request sent from node " << node << endl);
00578 ::abort();
00579
00580 } else if (tag == IPPL_EXIT_TAG) {
00581 ERRORMSG("Exiting due to exit request sent from node " << node << endl);
00582 ::exit(1);
00583
00584 } else if (tag == IPPL_RETRANSMIT_TAG) {
00585
00586 unpack_retransmission_request(nitems, pos);
00587
00588 } else if (tag == IPPL_MSG_OK_TAG) {
00589
00590 clear_ok_messages(nitems, pos);
00591
00592 } else {
00593
00594
00595
00596 if (Ippl::useChecksums()) {
00597
00598 int crcsize = bufsize - sizeof(CRCTYPE);
00599 CRCTYPE crcval = crc(buffer, crcsize);
00600
00601
00602
00603
00604
00605
00606 void *origloc = static_cast<void *>(static_cast<char *>(buffer)+crcsize);
00607 CRCTYPE origcrc = *(static_cast<CRCTYPE *>(origloc));
00608 if (crcval != origcrc) {
00609 ERRORMSG("Failed CRC check (" << crcval << " != " << origcrc);
00610 ERRORMSG(") on node " << Ippl::myNode());
00611 ERRORMSG(" for message " << mnum << " of size " << bufsize);
00612 ERRORMSG(" bytes sent from node ");
00613 ERRORMSG(node << " with tag " << tag << endl);
00614 if (Ippl::retransmit()) {
00615
00616 requestList.push_back(pair<int,MsgNum_t>(node, mnum));
00617 } else {
00618
00619 PInsist(crcval == origcrc, "Exiting due to CRC check failure.");
00620 }
00621
00622
00623 return 0;
00624 }
00625 }
00626
00627
00628
00629
00630
00631 newmsg = new Message(nitems);
00632
00633
00634 for (int j = 0; j < nitems; j++) {
00635 int *hdr = static_cast<int *>(pos);
00636 int elements = hdr[0];
00637 int bytesize = hdr[1];
00638 addwordround(pos, 2*sizeof(int));
00639
00640
00641
00642
00643
00644 if (bytesize > 0 && elements > 0) {
00645 newmsg->setCopy(false);
00646 newmsg->setDelete(false);
00647 newmsg->putmsg(pos, bytesize/elements, elements);
00648 addwordround(pos, bytesize);
00649 }
00650 }
00651
00652
00653 ADDIPPLSTAT(incMessageBytesReceived,bufsize);
00654
00655
00656 if (Ippl::retransmit())
00657 informOKList.push_back(pair<int,MsgNum_t>(node, mnum));
00658 }
00659
00660
00661
00662 return newmsg;
00663 }
00664
00665
00667
00668
00669 void Communicate::add_to_send_cache(void *msgbuf, MsgNum_t mnum, int msgsize,
00670 int node)
00671 {
00672 TAU_PROFILE("Communicate::add_to_send_cache",
00673 "void (void*, MsgNum_t, int)", TAU_MESSAGE);
00674
00675
00676 SentCache_t::iterator senti = sentMsgCache.find(mnum);
00677 if (senti != sentMsgCache.end()) {
00678 ERRORMSG("ERROR: Trying to cache an already-cached message with num = ");
00679 ERRORMSG(mnum << endl);
00680 return;
00681 }
00682
00683
00684
00685
00686
00687
00688 char *copybuf = new char[msgsize];
00689 memcpy(copybuf, msgbuf, msgsize);
00690
00691
00692 CommSendInfo csi(msgsize, copybuf, node);
00693 sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
00694
00695
00696
00697 }
00698
00699
00701
00702 void Communicate::request_retransmission(int node, MsgNum_t mnum)
00703 {
00704 Inform dbgmsg("***Communicate::request_retransmission", INFORM_ALL_NODES);
00705 dbgmsg << "Requesting retransmission of message " << mnum << " from node ";
00706 dbgmsg << node << endl;
00707
00708
00709
00710
00711 Message msg(1);
00712 msg.put(mnum);
00713 send(&msg, node, IPPL_RETRANSMIT_TAG, false);
00714 }
00715
00716
00718
00719
00720 void Communicate::unpack_retransmission_request(int nitems, void *pos)
00721 {
00722 Inform dbgmsg("***Communicate::unpack_retrans_req", INFORM_ALL_NODES);
00723
00724
00725
00726
00727
00728
00729 PInsist(nitems == 1, "Wrong number of items in retransmit request.");
00730
00731
00732 int *hdr = static_cast<int *>(pos);
00733 PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
00734 "Wrong message info for retransmit message number.");
00735 addwordround(pos, 2*sizeof(int));
00736
00737
00738 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
00739 dbgmsg << "Received request to resend message " << mnum << endl;
00740 resendList.push_back(mnum);
00741 }
00742
00743
00745
00746 void Communicate::perform_resend(MsgNum_t mnum)
00747 {
00748
00749 SentCache_t::iterator senti = sentMsgCache.find(mnum);
00750 PInsist(senti != sentMsgCache.end(),
00751 "Could not find message in local sent cache to retransmit.");
00752
00753
00754 int size = (*senti).second.size();
00755 int node = (*senti).second.node();
00756
00757
00758 ERRORMSG("WARNING: Resending message number " << mnum);
00759 ERRORMSG(" of size " << size << " from node ");
00760 ERRORMSG(myNode() << " to node " << node << " ..." << endl);
00761 resend((*senti).second.buf(), size, node, COMM_SEND_TAG);
00762 }
00763
00764
00766
00767 void Communicate::send_ok_message(int node, MsgNum_t mnum)
00768 {
00769 Inform dbgmsg("***Communicate::send_ok_message", INFORM_ALL_NODES);
00770 dbgmsg << "Informing node " << node << " that message " << mnum;
00771 dbgmsg << " was received ok." << endl;
00772
00773 Message msg(1);
00774 msg.put(mnum);
00775
00776 send(&msg, node, IPPL_MSG_OK_TAG, false);
00777 }
00778
00779
00781
00782
00783 void Communicate::clear_ok_messages(int nitems, void *pos)
00784 {
00785 Inform dbgmsg("***Communicate::clear_ok_messages", INFORM_ALL_NODES);
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795 PInsist(nitems >= 1, "Wrong number of items in retransmit request.");
00796
00797
00798
00799 for (int i=0; i < nitems; ++i) {
00800
00801 int *hdr = static_cast<int *>(pos);
00802 PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
00803 "Wrong message info for message-ok number.");
00804 addwordround(pos, 2*sizeof(int));
00805
00806
00807 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
00808 addwordround(pos, sizeof(MsgNum_t));
00809
00810
00811 dbgmsg << "Will clear message " << mnum << " as OK." << endl;
00812 sentOKList.push_back(mnum);
00813 }
00814 }
00815
00816
00818
00819
00820 void Communicate::remove_single_ok_message(MsgNum_t mnum)
00821 {
00822 Inform dbgmsg("***Communicate::remove_single_ok_message", INFORM_ALL_NODES);
00823
00824
00825 SentCache_t::iterator senti = sentMsgCache.find(mnum);
00826 if (senti == sentMsgCache.end()) {
00827
00828 ERRORMSG("ERROR: Received 'message ok' for message " << mnum);
00829 ERRORMSG(", but this node does not have that message in cache." << endl);
00830
00831 } else {
00832
00833 (*senti).second.freebuf();
00834 sentMsgCache.erase(senti);
00835 dbgmsg << "Removed message " << mnum << " from send cache; now ";
00836 dbgmsg << sentMsgCache.size() << " messages in list." << endl;
00837 }
00838 }
00839
00840
00842
00843 void Communicate::process_resend_requests()
00844 {
00845 if (resendList.size() > 0) {
00846 Inform dbgmsg("***Communicate::process_resend_reqs", INFORM_ALL_NODES);
00847 dbgmsg << "Clearing " << sentOKList.size() << " and resending ";
00848 dbgmsg << resendList.size() << " messages ..." << endl;
00849 }
00850
00851
00852 while (sentOKList.size() > 0) {
00853 MsgNum_t mnum = *(sentOKList.begin());
00854 sentOKList.erase(sentOKList.begin());
00855 remove_single_ok_message(mnum);
00856 }
00857
00858
00859 while (resendList.size() > 0) {
00860 MsgNum_t mnum = *(resendList.begin());
00861 resendList.erase(resendList.begin());
00862 perform_resend(mnum);
00863 }
00864
00865
00866 while (informOKList.size() > 0) {
00867 int node = (*(informOKList.begin())).first;
00868 MsgNum_t mnum = (*(informOKList.begin())).second;
00869 informOKList.erase(informOKList.begin());
00870 send_ok_message(node, mnum);
00871 }
00872
00873
00874 while (requestList.size() > 0) {
00875 int node = (*(requestList.begin())).first;
00876 MsgNum_t mnum = (*(requestList.begin())).second;
00877 requestList.erase(requestList.begin());
00878 request_retransmission(node, mnum);
00879 }
00880 }
00881
00882
00883
00884
00885
00886
00887