50 #define MAX_MPI_ERRS 500
54 static int numErrors = 0;
55 #if defined(IPPL_MPIXX)
56 static MPI_Aint size_of_MPI_INT;
58 static int size_of_MPI_INT;
63 #define PACKSIZE ((PSIZE)*sizeof(long))
64 static long mpipackbuf[
PSIZE];
78 :
Communicate(argc, argv, procs), weInitialized(mpiinit)
81 int i, reported, rep_host, ierror, result_len;
83 char *currtok, *nexttok, *execname;
86 currtok = strstr(argv[0],
"/");
89 execname = strdup(argv[0]);
94 nexttok = strstr(currtok,
"/");
98 nexttok = strstr(currtok,
"/");
100 execname = strdup(currtok);
107 MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
108 INFOMSG(
"Ippl will be initialized with " <<
109 omp_get_max_threads() <<
" OMP threads\n");
111 if ( provided != MPI_THREAD_FUNNELED )
112 ERRORMSG(
"CommMPI: Didn't get requested MPI-OpenMP setting.\n");
114 MPI_Init(&argc, &argv);
121 strcpy(argv[0],execname);
142 MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
153 std::vector<int> child_ready(TotalNodes);
154 for (i = 0; i <
TotalNodes; child_ready[i++] = 0)
156 INFOMSG(
"CommMPI: Parent process waiting for children ..." <<
endl);
158 while (reported < TotalNodes)
160 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
163 if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
165 child_ready[rep_host] = 1;
167 INFOMSG(
"CommMPI: Child " << rep_host <<
" ready." <<
endl);
171 ERRORMSG(
"CommMPI: Error with child reporting to parent. ");
172 ERRORMSG(
"rep_host = " << rep_host);
173 ERRORMSG(
", child_ready[] = " << child_ready[rep_host] <<
endl);
178 INFOMSG(
"CommMPI: Initialization complete." <<
endl);
183 char host_name[MPI_MAX_PROCESSOR_NAME];
184 ierror = MPI_Get_processor_name(host_name, &result_len);
201 WARNMSG(
"CommMPI: Child received bad message during startup." <<
endl);
211 std::vector<int> proccount;
212 proccount.push_back(1);
235 int trial, node, tag;
237 for (trial = 0; trial < 50000; ++trial)
246 WARNMSG(
"CommMPI: Found extra message from node " << node);
247 WARNMSG(
", tag " << tag <<
": msg = " << *msg <<
endl);
329 MPI_Status rec_status;
330 int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
345 errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
354 bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG,
communicator,
355 &rec_flag, &rec_status);
356 if ((bufid >= 0) && (rec_flag != 0) )
359 src_node = rec_status.MPI_SOURCE;
360 rec_tag = rec_status.MPI_TAG;
361 MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
364 if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) )
370 MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
389 add_msg(newmsg, rec_node, rec_utag);
400 MPI_Test(&request, &flag, &status);
408 return (errstat == 0);
426 int bufid, size, checknode, checktag, flag =
false;
430 checknode = (node < 0 || node >=
TotalNodes ? MPI_ANY_SOURCE : node);
434 bufid = MPI_Iprobe(checknode, checktag,
communicator, &flag, &stat);
438 ERRORMSG(
"CommMPI: cannot receive msg from node " << checknode);
443 ERRORMSG(
"Maximum number of MPI receive errors (" << numErrors);
452 MPI_Get_count(&stat,MPI_BYTE,&size);
455 ERRORMSG(
"CommMPI: received message has size " << size <<
endl);
457 else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0))
459 ERRORMSG(
"CommMPI: received message with invalid tag ");
462 else if (stat.MPI_SOURCE < 0)
464 ERRORMSG(
"CommMPI: received message from invalid source ");
469 checknode = stat.MPI_SOURCE;
470 checktag = stat.MPI_TAG;
479 MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
534 PInsist(node !=
myNode(),
"Can only retransmit to other nodes");
540 int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
549 MPI_Test(&request, &flag, &status);
553 return (errstat == 0);
570 return MPI_Send(data, size, MPI_BYTE, node, tag,
communicator)
580 MPI_Isend(data, size, MPI_BYTE, node, tag,
communicator, &request);
587 node = MPI_ANY_SOURCE;
592 MPI_Recv(data, size, MPI_BYTE, node, tag,
communicator, &stat);
594 node = stat.MPI_SOURCE;
597 MPI_Get_count(&stat, MPI_BYTE, &count);
604 node = MPI_ANY_SOURCE;
609 MPI_Irecv(buf, size, MPI_BYTE, node, tag,
communicator, &request);
617 node = MPI_ANY_SOURCE;
624 MPI_Get_count(&stat, MPI_BYTE, &count);
626 data =
new char[count];
629 node = stat.MPI_SOURCE;
632 MPI_Recv(data, count, MPI_BYTE, node, tag,
communicator, &stat);
virtual MPI_Request raw_ireceive(char *buf, int size, int node, int tag)
virtual void cleanupMessage(void *)
static void abort(const char *=0, int exitcode=(-1))
bool add_msg(Message *, int, int)
void fill_msg_buffer(void *, Message &, int, int, int)
virtual int raw_receive(char *, int size, int &node, int &tag)
std::vector< std::vector< int > > Processes
std::vector< int > Contexts
int find_msg_length(Message &)
void registerMessage(int)
Inform & level2(Inform &inf)
static IpplMessageCounter & getInstance()
virtual int raw_probe_receive(char *&, int &node, int &tag)
static void * makebuffer(int size)
void * pack_message(Message *msg, int tag, int &buffsize, int node)
virtual bool mysend(Message *, int node, int utag, int etag)
void useCommunicate(Communicate *c, void *d)
static void freebuffer(void *buf)
virtual void mybarrier(void)
virtual Message * myreceive(int &node, int &tag, int etag)
virtual MPI_Request raw_isend(void *, int size, int node, int tag)
Message * unpack_message(int &node, int &tag, void *pos)
Inform & endl(Inform &inf)
virtual bool raw_send(void *, int size, int node, int tag)
virtual bool resend(void *buf, int size, int node, int etag)
CommMPI(int &argc, char **&argv, int procs=(-1), bool mpiinit=true, MPI_Comm mpicomm=MPI_COMM_WORLD)