00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "Message/CommACLMPL.h"
00028 #include "Message/Message.h"
00029 #include "Profile/Profiler.h"
00030 #include "Utility/IpplInfo.h"
00031 #include "Utility/PAssert.h"
00032
00033 #include <string.h>
00034 #include <stdlib.h>
00035 #include <unistd.h>
00036
00037
00038
00039
00040 #define MAX_ACLMPL_ERRS 500
00041
00043
00044
00045
00046
00047
00048 CommACLMPL::CommACLMPL(int argc, char** argv, int procs)
00049 : Communicate(argc, argv, procs) {
00050 TAU_PROFILE("CommACLMPL::CommACLMPL()", "void (int, char **, int)",
00051 TAU_MESSAGE);
00052 int i, rep_host, reported, ierror=0;
00053
00054 if (procs < 0) procs = 0;
00055 ACLMPL_initialize(procs);
00056 myHost = (int) ACLMPL_self();
00057 TotalNodes = (int) ACLMPL_npes();
00058
00059
00060 if (procs > 0 && procs < TotalNodes) {
00061
00062 if (myHost >= procs)
00063 Ippl::abort();
00064 TotalNodes = procs;
00065 }
00066
00067 int msize = wordround(sizeof(int));
00068 void *sendbuf = makebuffer(msize);
00069 void *recbuff = makebuffer(msize);
00070 void *pos;
00071
00072 if (myHost == 0) {
00073
00074 pos = sendbuf;
00075 pack(&myHost, pos, sizeof(int));
00076 for (i=1; i<TotalNodes; i++)
00077 ACLMPL_send(i, sendbuf, msize);
00078
00079
00080 INFOMSG("CommACLMPL: Parent process waiting for children ..." << endl);
00081 int *child_ready = new int[TotalNodes];
00082 for (i=0; i<TotalNodes; i++) child_ready[i] = 0;
00083 reported = 1;
00084
00085 while (reported<TotalNodes) {
00086 ierror = ACLMPL_recv(reported,recbuff,msize);
00087 if (ierror>0) {
00088 pos = recbuff;
00089 unpack(pos,&rep_host,sizeof(int));
00090 if (rep_host>0 && rep_host<TotalNodes && !(child_ready[rep_host])) {
00091 child_ready[rep_host] = 1;
00092 reported++;
00093 INFOMSG("CommACLMPL: Child " << rep_host << " ready." << endl);
00094 }
00095 else {
00096 ERRORMSG("CommACLMPL: Error with child reporting to parent. ");
00097 ERRORMSG("rep_host = " << rep_host);
00098 }
00099 ierror = 0;
00100 }
00101 else {
00102 ERRORMSG("CommACLMPL: Message from child has size " << ierror << endl);
00103 }
00104 }
00105
00106 delete [] child_ready;
00107 INFOMSG("CommACLMPL: Initialization complete." << endl);
00108 } else {
00109
00110 int checknode;
00111 ierror = ACLMPL_recv(0, recbuff, msize);
00112 if (ierror>0) {
00113 pos = recbuff;
00114 unpack(pos,&checknode,sizeof(int));
00115 if (checknode != 0)
00116 WARNMSG("CommACLMPL: Child received bad message in startup." << endl);
00117 } else {
00118 WARNMSG("CommACLMPL: Child received message of size " << ierror << endl);
00119 }
00120
00121
00122 pos = sendbuf;
00123 pack(&myHost,pos,sizeof(int));
00124 ACLMPL_send(0,sendbuf,msize);
00125 }
00126
00127 freebuffer(sendbuf);
00128 freebuffer(recbuff);
00129
00130
00131 if (TotalNodes > 1) {
00132 vector<int> proccount;
00133 proccount.push_back(1);
00134 for (int i = 1; i < TotalNodes; i++) {
00135 Contexts.push_back(1);
00136 Processes.push_back(proccount);
00137 }
00138 }
00139
00140 }
00141
00142
00144
00145 CommACLMPL::~CommACLMPL(void) {
00146 TAU_PROFILE("CommACLMPL::~CommACLMPL()", "void ()", TAU_MESSAGE);
00147
00148
00149
00150 if (TotalNodes > 1) {
00151 int trial, node, tag;
00152 Message *msg;
00153 for (trial = 0; trial < 50000; ++trial) {
00154 do {
00155 node = COMM_ANY_NODE;
00156 tag = COMM_ANY_TAG;
00157 msg = myreceive(node, tag, COMM_SEND_TAG);
00158 if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG) {
00159 WARNMSG("CommACLMPL: Found extra message from node " << node);
00160 WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00161 }
00162 } while (msg != 0);
00163 }
00164 }
00165 }
00166
00167
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179 AsyncMsgInfo *
00180 CommACLMPL::pack_message(Message *msg, int tag, int &buffsize, int node) {
00181 TAU_PROFILE("CommACLMPL::pack_message()", "(Message *, int, int, int)",
00182 TAU_MESSAGE);
00183
00184
00185 buffsize = find_msg_length(*msg);
00186
00187
00188 AsyncMsgInfo *outbuffer = ACLMPL_asend_alloc(buffsize);
00189 void *pos = ACLMPL_agetbuffer(outbuffer);
00190
00191
00192 fill_msg_buffer(pos, *msg, tag, buffsize, node);
00193 return outbuffer;
00194 }
00195
00196
00198
00199
00200
00201
00202
00203
00204 bool CommACLMPL::mysend(Message* msg, int node, int tag, int etag) {
00205 TAU_TYPE_STRING(taustr, "bool (Message *, int, int, int)");
00206 TAU_PROFILE("CommACLMPL::mysend()", taustr, TAU_MESSAGE);
00207 TAU_PROFILE_TIMER(packtimer, " ACLMPL::mysend::pack", taustr, TAU_MESSAGE);
00208 TAU_PROFILE_TIMER(sendtimer, " ACLMPL::mysend::send", taustr, TAU_MESSAGE);
00209
00210
00211 int size = 0;
00212
00213
00214 TAU_PROFILE_START(packtimer);
00215 AsyncMsgInfo *outbuffer = pack_message(msg, tag, size, node);
00216 TAU_PROFILE_STOP(packtimer);
00217
00218 TAU_PROFILE_START(sendtimer);
00219 int sentsize = ACLMPL_alink(node, etag, outbuffer);
00220 TAU_PROFILE_STOP(sendtimer);
00221
00222
00223 if (sentsize == 0)
00224 ERRORMSG("CommACLMPL: Failure during asynchronous send!" << endl);
00225 return (sentsize != 0);
00226 }
00227
00228
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240 Message* CommACLMPL::myreceive(int& node, int& tag, int etag) {
00241 TAU_TYPE_STRING(taustr, "Message *(int, int, int)" );
00242 TAU_PROFILE("CommACLMPL::myreceive()", taustr, TAU_MESSAGE);
00243 TAU_PROFILE_TIMER(findtimer, " ACLMPL::myrec::find", taustr, TAU_MESSAGE);
00244 TAU_PROFILE_TIMER(infotimer, " ACLMPL::myrec::getinfo",taustr, TAU_MESSAGE);
00245 TAU_PROFILE_TIMER(unpktimer, " ACLMPL::myrec::unpack", taustr, TAU_MESSAGE);
00246
00247 int checknode = (node < 0 || node >= TotalNodes ? ACLMPL_ANY_PE : node);
00248 int checktag = etag;
00249
00250
00251 Message* newmsg = NULL;
00252 TAU_PROFILE_START(findtimer);
00253 AsyncMsgInfo *msg;
00254 msg = ACLMPL_afind(checknode, checktag);
00255 TAU_PROFILE_STOP(findtimer);
00256
00257 if (msg != 0) {
00258 checknode = ACLMPL_agetsource(msg);
00259 checktag = ACLMPL_agettag(msg);
00260
00261
00262 TAU_PROFILE_START(unpktimer);
00263
00264
00265
00266 void *outbuff = ACLMPL_agetbuffer(msg);
00267
00268
00269 newmsg = unpack_message(node, tag, outbuff);
00270
00271 if (newmsg == 0) {
00272
00273
00274 cleanupMessage(static_cast<void *>(msg));
00275 } else {
00276
00277
00278
00279
00280 newmsg->useCommunicate(this, static_cast<void *>(msg));
00281 }
00282
00283 TAU_PROFILE_STOP(unpktimer);
00284 }
00285 else {
00286
00287 DEBUGMSG(level2 << "CommACLMPL: No Message Found to Match Request!" << endl);
00288 }
00289
00290
00291 return newmsg;
00292 }
00293
00294
00296
00297
00298
00299 void CommACLMPL::mybarrier(void) {
00300 TAU_PROFILE("CommACLMPL::mybarrier()", "void ()", TAU_MESSAGE);
00301
00302 ACLMPL_barrier();
00303 }
00304
00305
00307
00308
00309 bool CommACLMPL::resend(void *buf, int buffsize, int node, int etag) {
00310 TAU_PROFILE("CommACLMPL::resend()", "void (void *, int, int, int)",
00311 TAU_MESSAGE);
00312
00313
00314
00315
00316
00317
00318 PInsist(node != myNode(), "Can only retransmit to other nodes");
00319
00320
00321 AsyncMsgInfo *outbuffer = ACLMPL_asend_alloc(buffsize);
00322 void *pos = ACLMPL_agetbuffer(outbuffer);
00323
00324
00325 memcpy(pos, buf, buffsize);
00326
00327
00328 int sentsize = ACLMPL_alink(node, etag, outbuffer);
00329
00330
00331
00332
00333 if (sentsize == 0)
00334 ERRORMSG("CommACLMPL: Failure during asynchronous resend!" << endl);
00335 return (sentsize != 0);
00336 }
00337
00338
00340
00341 void CommACLMPL::cleanupMessage(void *d) {
00342 TAU_PROFILE("CommACLMPL::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00343
00344
00345 ACLMPL_afree(static_cast<AsyncMsgInfo *>(d));
00346 }
00347
00348
00349
00350
00351
00352
00353