51 #define PM_MTU (8192 + 32)
63 #define MAX_MPI_ERRS 500
67 static int size_of_MPI_INT;
70 static pmCtx pm_network;
82 int i, reported, rep_host, ierror, result_len;
84 char *currtok, *nexttok, *execname;
87 currtok = strstr(argv[0],
"/");
90 execname = strdup(argv[0]);
95 nexttok = strstr(currtok,
"/");
99 nexttok = strstr(currtok,
"/");
101 execname = strdup(currtok);
105 if (_score_alloc_subnet())
107 ERRORMSG(
"CommPM: Error with allocating sub-network.");
112 MPI_Init(&argc, &argv);
115 strcpy(argv[0],execname);
119 MPI_Comm_rank(MPI_COMM_WORLD,&
myHost);
130 MPI_Type_size ( MPI_INT, (MPI_Aint*)&size_of_MPI_INT );
142 std::vector<int> child_ready(TotalNodes);
143 for (i = 0; i <
TotalNodes; child_ready[i++] = 0);
144 INFOMSG(
"CommPM: Parent process waiting for children ..." <<
endl);
146 while (reported < TotalNodes)
148 ierror = MPI_Recv(&rep_host, 1, MPI_INT, MPI_ANY_SOURCE,
151 if (rep_host >= 0 && rep_host < TotalNodes && !(child_ready[rep_host]))
153 child_ready[rep_host] = 1;
155 INFOMSG(
"CommPM: Child " << rep_host <<
" ready." <<
endl);
159 ERRORMSG(
"CommPM: Error with child reporting to parent. ");
160 ERRORMSG(
"rep_host = " << rep_host);
161 ERRORMSG(
", child_ready[] = " << child_ready[rep_host] <<
endl);
166 INFOMSG(
"CommPM: Initialization complete." <<
endl);
171 char host_name[MPI_MAX_PROCESSOR_NAME];
172 ierror = MPI_Get_processor_name(host_name, &result_len);
185 MPI_Recv(&checknode, 1, MPI_INT, 0,
COMM_HOSTS_TAG, MPI_COMM_WORLD,
189 WARNMSG(
"CommPM: Child received bad message during startup." <<
endl);
199 vector<int> proccount;
200 proccount.push_back(1);
223 int trial, node, tag;
225 for (trial = 0; trial < 50000; ++trial)
234 WARNMSG(
"CommPM: Found extra message from node " << node);
235 WARNMSG(
", tag " << tag <<
": msg = " << *msg <<
endl);
248 MPI_Send(&dieCode, 1, MPI_INT, i,
COMM_DIE_TAG, MPI_COMM_WORLD);
255 MPI_Recv(&dieCode, 1, MPI_INT, 0,
COMM_DIE_TAG, MPI_COMM_WORLD, &stat);
260 _score_free_subnet();
284 void *CommPM::pack_message(
Message *msg,
int tag,
int &buffsize)
320 int timeout_counter = 0;
321 while (
_pmGetSendBuf(pm_network, (caddr_t*)&msgbuf, length) == ENOBUFS)
324 if (timeout_counter > 10000000)
326 ERRORMSG(
"CommPM: _pmGetSendBuf TIMEOUT");
367 int src_node, rec_size, rec_utag;
371 if (error =
_pmReceive(pm_network, (caddr_t*)&msgbuf, (
size_t*)&length))
374 if (error != ENOBUFS)
376 ERRORMSG(
"CommPM: _pmReceive Error (in pickup_message)");
384 src_node = msgbuf->
node;
389 memcpy(rec_buff, (
void*) (&(msgbuf->
tag) + 1), rec_size);
395 if (
add_msg(newmsg, src_node, rec_utag))
423 int rec_tag, src_node, rec_size, rec_utag;
428 if (error =
_pmReceive(pm_network, (caddr_t*)&msgbuf, (
size_t*)&length))
431 if (error != ENOBUFS)
433 ERRORMSG(
"CommPM: _pmReceive Error (in myreceive)");
445 src_node = msgbuf->
node;
446 rec_tag = msgbuf->
tag;
450 memcpy(rec_buff, (
void*) (&(msgbuf->
tag) + 1), rec_size);
471 MPI_Barrier(MPI_COMM_WORLD);
static void abort(const char *=0, int exitcode=(-1))
virtual bool mysend(Message *, int node, int utag, int etag)
CommPM(int &argc, char **&argv, int procs=(-1))
bool add_msg(Message *, int, int)
void fill_msg_buffer(void *, Message &, int, int, int)
std::vector< std::vector< int > > Processes
int _pmGetSendBuf(pmCtx pmc, caddr_t *bufp, size_t length)
std::vector< int > Contexts
int _pmSendDone(pmCtx pmc)
int find_msg_length(Message &)
Inform & level2(Inform &inf)
int _pmPutReceiveBuf(pmCtx pmc)
int _pmSend(pmCtx pmc, int dst_node)
int _pmReceive(pmCtx pmc, caddr_t *bufp, size_t *length)
static void * makebuffer(int size)
void useCommunicate(Communicate *c, void *d)
static void freebuffer(void *buf)
virtual void cleanupMessage(void *)
virtual Message * myreceive(int &node, int &tag, int etag)
Message * unpack_message(int &node, int &tag, void *pos)
Inform & endl(Inform &inf)
virtual void mybarrier(void)