00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "Message/CommSHMEMPI.h"
00028 #include "Message/Message.h"
00029 #include "Utility/IpplInfo.h"
00030 #include "Profile/Profiler.h"
00031
00032
00033 #include <mpi.h>
00034
00035 #include <string.h>
00036 #include <stdlib.h>
00037 #include <unistd.h>
00038
00039
00040
00041
00042
00043 #define MAX_SHMEMPI_ERRS 500
00044
00045
00046
00047 static int numErrors = 0;
00048 static int size_of_SHMEMPI_INT;
00049
00050
00051 #define PSIZE 1024*16
00052 #define PACKSIZE ((PSIZE)*sizeof(long))
00053 static long shmempipackbuf[PSIZE];
00054
00055
00056
00058
00059
00060
00061
00062 CommSHMEMPI::CommSHMEMPI(int& argc , char**& argv, int procs)
00063 : Communicate(argc, argv, procs) {
00064 TAU_PROFILE("CommSHMEMPI::CommSHMEMPI()", "void (int, char **, int)",
00065 TAU_MESSAGE);
00066
00067 int i, reported, rep_host, ierror, result_len;
00068 MPI_Status stat;
00069 char *currtok, *nexttok, *execname;
00070
00071
00072 currtok = strstr(argv[0],"/");
00073 if (!currtok) {
00074 execname = strdup(argv[0]);
00075 } else {
00076 currtok++;
00077 nexttok = strstr(currtok,"/");
00078 while (nexttok) {
00079 currtok = nexttok+1;
00080 nexttok = strstr(currtok,"/");
00081 }
00082 execname = strdup(currtok);
00083 }
00084
00085
00086 MPI_Init(&argc, &argv);
00087
00088
00089 strcpy(argv[0],execname);
00090
00091
00092 MPI_Comm_size(MPI_COMM_WORLD,&TotalNodes);
00093 MPI_Comm_rank(MPI_COMM_WORLD,&myHost);
00094
00095
00096 if (procs > 0 && procs < TotalNodes) {
00097
00098 if (myHost >= procs)
00099 Ippl::abort();
00100 TotalNodes = procs;
00101 }
00102
00103 MPI_Type_size ( MPI_INT, &size_of_SHMEMPI_INT );
00104 if (myHost == 0) {
00105
00106 for (i = 1; i < TotalNodes; i++)
00107 {
00108 MPI_Send(&myHost, 1, MPI_INT, i, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00109 TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, i, 1*size_of_SHMEMPI_INT);
00110 }
00111
00112
00113 int *child_ready = new int[TotalNodes];
00114 for (i = 0; i < TotalNodes; child_ready[i++] = 0);
00115 INFOMSG("CommSHMEMPI: Parent process waiting for children ..." << endl);
00116 reported = 1;
00117 while (reported < TotalNodes) {
00118 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
00119 COMM_HOSTS_TAG, MPI_COMM_WORLD, &stat);
00120 TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, stat.MPI_SOURCE,
00121 1*size_of_SHMEMPI_INT);
00122 if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host])) {
00123 child_ready[rep_host] = 1;
00124 reported++;
00125 INFOMSG("CommSHMEMPI: Child " << rep_host << " ready." << endl);
00126 }
00127 else {
00128 ERRORMSG("CommSHMEMPI: Error with child reporting to parent. ");
00129 ERRORMSG("rep_host = " << rep_host);
00130 ERRORMSG(", child_ready[] = " << child_ready[rep_host] << endl);
00131 }
00132 }
00133
00134 delete [] child_ready;
00135 INFOMSG("CommSHMEMPI: Initialization complete." << endl);
00136
00137 }
00138 else {
00139 char host_name[MPI_MAX_PROCESSOR_NAME];
00140 ierror = MPI_Get_processor_name(host_name, &result_len);
00141 if (ierror >= 0) {
00142 INFOMSG("CommSHMEMPI: Started job " << myHost << " on host `");
00143 INFOMSG(host_name << "'." << endl);
00144 }
00145 else {
00146 ERRORMSG("CommSHMEMPI: failed" << endl);
00147 }
00148
00149
00150 int checknode;
00151 MPI_Recv(&checknode, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD,
00152 &stat);
00153 TAU_TRACE_RECVMSG(COMM_HOSTS_TAG, 0, 1*size_of_SHMEMPI_INT);
00154 if (checknode != 0)
00155 WARNMSG("CommSHMEMPI: Child received bad message during startup." << endl);
00156
00157
00158 MPI_Send(&myHost, 1, MPI_INT, 0, COMM_HOSTS_TAG, MPI_COMM_WORLD);
00159 TAU_TRACE_SENDMSG(COMM_HOSTS_TAG, 0, 1*size_of_SHMEMPI_INT);
00160 }
00161
00162
00163 if (TotalNodes > 1) {
00164 vector<int> proccount;
00165 proccount.push_back(1);
00166 for (i = 1; i < TotalNodes; i++) {
00167 Contexts.push_back(1);
00168 Processes.push_back(proccount);
00169 }
00170 }
00171 }
00172
00173
00175
00176 CommSHMEMPI::~CommSHMEMPI(void) {
00177 TAU_PROFILE("CommSHMEMPI::~CommSHMEMPI()", "void()", TAU_MESSAGE);
00178 int i, dieCode = 0;
00179 MPI_Status stat;
00180
00181
00182
00183 if (TotalNodes > 1) {
00184 int trial, node, tag;
00185 Message *msg;
00186 for (trial = 0; trial < 50000; ++trial) {
00187 do {
00188 node = COMM_ANY_NODE;
00189 tag = COMM_ANY_TAG;
00190 msg = myreceive(node, tag, COMM_SEND_TAG);
00191 if (msg != 0 && tag != IPPL_ABORT_TAG && tag != IPPL_EXIT_TAG) {
00192 WARNMSG("CommSHMEMPI: Found extra message from node " << node);
00193 WARNMSG(", tag " << tag << ": msg = " << *msg << endl);
00194 }
00195 } while (msg != 0);
00196 }
00197 }
00198
00199
00200 if (myNode() == 0) {
00201
00202 for (i = 1; i < TotalNodes; i++)
00203 {
00204 MPI_Send(&dieCode, 1, MPI_INT, i, COMM_DIE_TAG, MPI_COMM_WORLD);
00205 TAU_TRACE_SENDMSG(COMM_DIE_TAG, i, 1*size_of_SHMEMPI_INT);
00206 }
00207 } else {
00208
00209 MPI_Recv(&dieCode, 1, MPI_INT, 0, COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
00210 TAU_TRACE_RECVMSG(COMM_DIE_TAG, 0, 1*size_of_SHMEMPI_INT);
00211 }
00212
00213 MPI_Finalize();
00214 }
00215
00216
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228 void *CommSHMEMPI::pack_message(Message *msg, int tag, int &buffsize) {
00229 TAU_PROFILE("CommSHMEMPI::pack_message()", "(Message *, int, int)",
00230 TAU_MESSAGE);
00231
00232 buffsize = find_msg_length(*msg);
00233
00234
00235 void *pos = (buffsize > PACKSIZE) ? makebuffer(buffsize) : shmempipackbuf;
00236
00237
00238 fill_msg_buffer(pos, *msg, tag, buffsize);
00239 return pos;
00240 }
00241
00242
00244
00245
00246
00247
00248
00249
00250 bool CommSHMEMPI::mysend(Message *msg, int node, int tag, int etag) {
00251 TAU_PROFILE("CommSHMEMPI::mysend()", "bool (Message *, int, int, int)",
00252 TAU_MESSAGE);
00253
00254 int nitems = msg->size();
00255 int errstat = (-1);
00256 int flag = false;
00257 MPI_Request request;
00258 MPI_Status status;
00259
00260 MPI_Status rec_status;
00261 int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
00262 Message* newmsg = NULL;
00263
00264
00265 int size;
00266 void *outbuffer = pack_message(msg, tag, size);
00267
00268
00269 errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
00270 MPI_COMM_WORLD, &request);
00271 TAU_TRACE_SENDMSG(etag, node, size);
00272
00273 while (!flag) {
00274
00275 bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
00276 &rec_flag, &rec_status);
00277 if ( (bufid >= 0) && (rec_flag != 0) ) {
00278
00279 src_node = rec_status.MPI_SOURCE;
00280 rec_tag = rec_status.MPI_TAG;
00281 MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
00282 if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) ) {
00283
00284 void *rec_buff = makebuffer(rec_size);
00285
00286
00287 MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
00288 MPI_COMM_WORLD, &rec_status);
00289 TAU_TRACE_RECVMSG(rec_tag, src_node, rec_size);
00290 newmsg = unpack_message(rec_node, rec_utag, rec_buff);
00291
00292
00293
00294
00295 newmsg->useCommunicate(this, rec_buff);
00296
00297
00298 if (add_msg(newmsg,rec_node,rec_utag)) {
00299 newmsg = NULL;
00300 rec_flag = 0;
00301 }
00302 }
00303 }
00304
00305
00306 MPI_Test(&request, &flag, &status);
00307 }
00308
00309
00310 if (size > PACKSIZE)
00311 freebuffer(outbuffer);
00312
00313
00314 return (errstat == 0);
00315 }
00316
00317
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329 Message *CommSHMEMPI::myreceive(int& node, int& tag, int etag) {
00330 TAU_PROFILE("CommSHMEMPI::myreceive()", "Message *(int, int, int)",
00331 TAU_MESSAGE);
00332
00333 int bufid, size, checknode, checktag, flag = false;
00334 Message *newmsg = 0;
00335 MPI_Status stat;
00336
00337 checknode = (node < 0 || node >= TotalNodes ? MPI_ANY_SOURCE : node);
00338 checktag = etag;
00339
00340
00341 bufid = MPI_Iprobe(checknode, checktag, MPI_COMM_WORLD, &flag, &stat);
00342 if (bufid < 0) {
00343
00344 ERRORMSG("CommSHMEMPI: cannot receive msg from node " << checknode);
00345 ERRORMSG(", tag " << checktag << endl);
00346
00347 if (MAX_SHMEMPI_ERRS > 0 && ++numErrors > MAX_SHMEMPI_ERRS) {
00348 ERRORMSG("Maximum number of MPI receive errors (" << numErrors);
00349 ERRORMSG(") exceeded. MPI is hosed!!" << endl);
00350 Ippl::abort();
00351 }
00352 }
00353
00354
00355 if (flag == true) {
00356 MPI_Get_count(&stat,MPI_BYTE,&size);
00357 if (size < 0) {
00358 ERRORMSG("CommSHMEMPI: received message has size " << size << endl);
00359 } else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0)) {
00360 ERRORMSG("CommSHMEMPI: received message with invalid tag ");
00361 ERRORMSG(stat.MPI_TAG << endl);
00362 } else if (stat.MPI_SOURCE < 0) {
00363 ERRORMSG("CommSHMEMPI: received message from invalid source ");
00364 ERRORMSG(stat.MPI_SOURCE << endl);
00365 } else {
00366 checknode = stat.MPI_SOURCE;
00367 checktag = stat.MPI_TAG;
00368
00369
00370 void *outbuff = makebuffer(size);
00371
00372
00373 MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
00374 MPI_COMM_WORLD, &stat);
00375 TAU_TRACE_RECVMSG(checktag, checknode, size);
00376 newmsg = unpack_message(node, tag, outbuff);
00377
00378
00379
00380
00381 newmsg->useCommunicate(this, outbuff);
00382 numErrors = 0;
00383 }
00384 } else {
00385
00386 DEBUGMSG(level2 << "CommSHMEMPI: No Message Received to Match Request");
00387 DEBUGMSG(endl);
00388 }
00389
00390
00391 return newmsg;
00392 }
00393
00394
00396
00397
00398
00399 void CommSHMEMPI::mybarrier(void) {
00400 TAU_PROFILE("CommSHMEMPI::mybarrier()", "void ()", TAU_MESSAGE);
00401
00402 MPI_Barrier(MPI_COMM_WORLD);
00403 }
00404
00405
00407
00408 void CommSHMEMPI::cleanupMessage(void *d) {
00409 TAU_PROFILE("CommSHMEMPI::cleanupMessage()", "void (void *)", TAU_MESSAGE);
00410
00411
00412 freebuffer(d);
00413 }
00414
00415
00416
00417
00418
00419
00420