44#define MAX_MPI_ERRS 500
48static int numErrors = 0;
49static int size_of_MPI_INT;
53#define PACKSIZE ((PSIZE)*sizeof(long))
54static long mpipackbuf[
PSIZE];
68 :
Communicate(argc, argv, procs), weInitialized(mpiinit)
71 int i, reported, rep_host, ierror, result_len;
73 char *currtok, *nexttok, *execname;
76 currtok = strstr(argv[0],
"/");
79 execname = strdup(argv[0]);
84 nexttok = strstr(currtok,
"/");
88 nexttok = strstr(currtok,
"/");
90 execname = strdup(currtok);
97 MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
98 INFOMSG(
"Ippl will be initialized with " <<
99 omp_get_max_threads() <<
" OMP threads\n");
101 if ( provided != MPI_THREAD_FUNNELED )
102 ERRORMSG(
"CommMPI: Didn't get requested MPI-OpenMP setting.\n");
104 MPI_Init(&argc, &argv);
111 strcpy(argv[0],execname);
132 MPI_Type_size ( MPI_INT, &size_of_MPI_INT );
144 for (i = 0; i <
TotalNodes; child_ready[i++] = 0)
150 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
153 if (rep_host >= 0 && rep_host <
TotalNodes && !(child_ready[rep_host]))
155 child_ready[rep_host] = 1;
161 ERRORMSG(
"CommMPI: Error with child reporting to parent. ");
162 ERRORMSG(
"rep_host = " << rep_host);
163 ERRORMSG(
", child_ready[] = " << child_ready[rep_host] <<
endl);
173 char host_name[MPI_MAX_PROCESSOR_NAME];
174 ierror = MPI_Get_processor_name(host_name, &result_len);
191 WARNMSG(
"CommMPI: Child received bad message during startup." <<
endl);
201 std::vector<int> proccount;
202 proccount.push_back(1);
225 int trial, node, tag;
227 for (trial = 0; trial < 50000; ++trial)
236 WARNMSG(
"CommMPI: Found extra message from node " << node);
237 WARNMSG(
", tag " << tag <<
": msg = " << *msg <<
endl);
319 MPI_Status rec_status;
320 int src_node, rec_node, rec_tag, rec_size, rec_utag, bufid, rec_flag = 0;
335 errstat = MPI_Isend(outbuffer, size, MPI_BYTE, node, etag,
344 bufid = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG,
communicator,
345 &rec_flag, &rec_status);
346 if ((bufid >= 0) && (rec_flag != 0) )
349 src_node = rec_status.MPI_SOURCE;
350 rec_tag = rec_status.MPI_TAG;
351 MPI_Get_count(&rec_status, MPI_BYTE, &rec_size);
354 if ( (rec_size >= 0) && (rec_tag >= 0) && (src_node >= 0) )
360 MPI_Recv(rec_buff, rec_size, MPI_BYTE, src_node, rec_tag,
379 add_msg(newmsg, rec_node, rec_utag);
390 MPI_Test(&request, &flag, &status);
398 return (errstat == 0);
416 int bufid, size, checknode, checktag, flag =
false;
420 checknode = (node < 0 || node >=
TotalNodes ? MPI_ANY_SOURCE : node);
424 bufid = MPI_Iprobe(checknode, checktag,
communicator, &flag, &stat);
428 ERRORMSG(
"CommMPI: cannot receive msg from node " << checknode);
433 ERRORMSG(
"Maximum number of MPI receive errors (" << numErrors);
442 MPI_Get_count(&stat,MPI_BYTE,&size);
445 ERRORMSG(
"CommMPI: received message has size " << size <<
endl);
447 else if ((stat.MPI_TAG != checktag) || (stat.MPI_TAG < 0))
449 ERRORMSG(
"CommMPI: received message with invalid tag ");
452 else if (stat.MPI_SOURCE < 0)
454 ERRORMSG(
"CommMPI: received message from invalid source ");
459 checknode = stat.MPI_SOURCE;
460 checktag = stat.MPI_TAG;
469 MPI_Recv(outbuff, size, MPI_BYTE, checknode, checktag,
519 PInsist(node !=
myNode(),
"Can only retransmit to other nodes");
525 int errstat = MPI_Isend(buf, buffsize, MPI_BYTE, node, etag,
534 MPI_Test(&request, &flag, &status);
538 return (errstat == 0);
555 return MPI_Send(data, size, MPI_BYTE, node, tag,
communicator)
565 MPI_Isend(data, size, MPI_BYTE, node, tag,
communicator, &request);
572 node = MPI_ANY_SOURCE;
577 MPI_Recv(data, size, MPI_BYTE, node, tag,
communicator, &stat);
579 node = stat.MPI_SOURCE;
582 MPI_Get_count(&stat, MPI_BYTE, &count);
589 node = MPI_ANY_SOURCE;
594 MPI_Irecv(buf, size, MPI_BYTE, node, tag,
communicator, &request);
602 node = MPI_ANY_SOURCE;
609 MPI_Get_count(&stat, MPI_BYTE, &count);
611 data =
new char[count];
614 node = stat.MPI_SOURCE;
617 MPI_Recv(data, count, MPI_BYTE, node, tag,
communicator, &stat);
Inform & endl(Inform &inf)
Inform & level5(Inform &inf)
CommMPI(int &argc, char **&argv, int procs=(-1), bool mpiinit=true, MPI_Comm mpicomm=MPI_COMM_WORLD)
virtual bool raw_send(void *, int size, int node, int tag)
virtual bool resend(void *buf, int size, int node, int etag)
virtual MPI_Request raw_isend(void *, int size, int node, int tag)
virtual Message * myreceive(int &node, int &tag, int etag)
virtual void mybarrier(void)
virtual MPI_Request raw_ireceive(char *buf, int size, int node, int tag)
virtual int raw_probe_receive(char *&, int &node, int &tag)
virtual bool mysend(Message *, int node, int utag, int etag)
void * pack_message(Message *msg, int tag, int &buffsize, int node)
virtual void cleanupMessage(void *)
virtual int raw_receive(char *, int size, int &node, int &tag)
std::vector< int > Contexts
static void * makebuffer(int size)
static void freebuffer(void *buf)
int find_msg_length(Message &)
std::vector< std::vector< int > > Processes
bool add_msg(Message *, int, int)
Message * unpack_message(int &node, int &tag, void *pos)
void fill_msg_buffer(void *, Message &, int, int, int)
void useCommunicate(Communicate *c, void *d)
static void abort(const char *=0)
static IpplMessageCounter & getInstance()
void registerMessage(int)