51 o <<
"Parallel communication method: " << c.
name() <<
"\n";
52 o <<
" Total nodes: " << c.
getNodes() <<
", Current node: ";
54 o <<
" Queued received messages: ";
92 (*cachei).second.freebuf();
117 for ( ; qi != qend ; ++qi)
190 if ( node < 0 || node >=
getNodes() || tag < 0 )
192 ERRORMSG(
"Communicate: illegal send node " << node <<
endl);
201 retval =
add_msg(msg, node, tag);
256 if (msg == 0 &&
myNode() != node)
258 int checknode = node;
272 add_msg(msg, checknode, checktag);
331 int checknode = node;
345 add_msg(msg, checknode, checktag);
371 "Local message not found in Communicate::receive_block!!");
390 for (i=(
getNodes() - 1); i >= 0; i--)
419 for (i=(
getNodes() - 1); i >= 0; i--)
461 static const unsigned int intsize4 =
wordround(4 *
sizeof(
int));
462 static const unsigned int intsize2 =
wordround(2 *
sizeof(
int));
466 unsigned int buffsize = longsize + intsize4;
470 int nitems = msg.
size();
471 for (
int i=0; i < nitems; ++i)
485 int bufsize,
int node)
489 int nitems = msg.
size();
501 memset(pos, 0, bufsize);
511 pack(mdata, pos, 4*
sizeof(
int));
514 for (
int i=0; i < nitems; ++i)
519 pack(mdata, pos, 2*
sizeof(
int));
521 pack(msgitem.
data(), pos, mdata[1]);
528 int crcsize = bufsize -
sizeof(
CRCTYPE);
532 *(
static_cast<CRCTYPE *
>(pos)) = crcval;
569 int *mdata =
static_cast<int *
>(pos);
572 int nitems = mdata[2];
573 int bufsize = mdata[3];
582 ERRORMSG(
"Stopping due to abort request sent from node " << node <<
endl);
588 ERRORMSG(
"Exiting due to exit request sent from node " << node <<
endl);
612 int crcsize = bufsize -
sizeof(
CRCTYPE);
620 void *origloc =
static_cast<void *
>(
static_cast<char *
>(buffer)+crcsize);
622 if (crcval != origcrc)
624 ERRORMSG(
"Failed CRC check (" << crcval <<
" != " << origcrc);
626 ERRORMSG(
" for message " << mnum <<
" of size " << bufsize);
632 requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
637 PInsist(crcval == origcrc,
"Exiting due to CRC check failure.");
652 for (
int j = 0; j < nitems; j++)
654 int *hdr =
static_cast<int *
>(pos);
656 int bytesize = hdr[1];
663 if (bytesize > 0 && elements > 0)
667 newmsg->
putmsg(pos, bytesize/elements, elements);
677 informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
697 ERRORMSG(
"ERROR: Trying to cache an already-cached message with num = ");
707 char *copybuf =
new char[msgsize];
708 memcpy(copybuf, msgbuf, msgsize);
724 dbgmsg <<
"Requesting retransmission of message " << mnum <<
" from node ";
725 dbgmsg << node <<
endl;
748 PInsist(nitems == 1,
"Wrong number of items in retransmit request.");
751 int *hdr =
static_cast<int *
>(pos);
753 "Wrong message info for retransmit message number.");
758 dbgmsg <<
"Received request to resend message " << mnum <<
endl;
770 "Could not find message in local sent cache to retransmit.");
773 int size = (*senti).second.size();
774 int node = (*senti).second.node();
777 ERRORMSG(
"WARNING: Resending message number " << mnum);
778 ERRORMSG(
" of size " << size <<
" from node ");
789 dbgmsg <<
"Informing node " << node <<
" that message " << mnum;
790 dbgmsg <<
" was received ok." <<
endl;
814 PInsist(nitems >= 1,
"Wrong number of items in retransmit request.");
818 for (
int i=0; i < nitems; ++i)
821 int *hdr =
static_cast<int *
>(pos);
823 "Wrong message info for message-ok number.");
831 dbgmsg <<
"Will clear message " << mnum <<
" as OK." <<
endl;
849 ERRORMSG(
"ERROR: Received 'message ok' for message " << mnum);
850 ERRORMSG(
", but this node does not have that message in cache." <<
endl);
856 (*senti).second.freebuf();
858 dbgmsg <<
"Removed message " << mnum <<
" from send cache; now ";
871 dbgmsg <<
"Clearing " <<
sentOKList.size() <<
" and resending ";
void clear_ok_messages(int nitems, void *pos)
std::ostream & operator<<(std::ostream &os, const Attribute &attr)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
std::vector< MessageData > recMsgList
virtual ~Communicate(void)
bool add_msg(Message *, int, int)
static unsigned int wordround(int size)
void fill_msg_buffer(void *, Message &, int, int, int)
unsigned int numBytes() const
std::vector< std::vector< int > > Processes
std::vector< int > Contexts
int find_msg_length(Message &)
virtual bool mysend(Message *, int node, int utag, int etag)
virtual void mybarrier(void)
void request_retransmission(int node, MsgNum_t mnum)
static bool useChecksums()
CRCTYPE crc(void *icp, int icnt)
constexpr double c
The velocity of light in m/s.
static void addwordround(void *&pos, int size)
Message & putmsg(void *, int, int=0)
unsigned int numElems() const
virtual int broadcast_others(Message *, int, bool delmsg=true)
T * value_type(const SliceIterator< T > &)
Message & put(const T &val)
virtual void cleanupMessage(void *)
void process_resend_requests()
Message * find_msg(int &, int &)
#define ADDIPPLSTAT(stat, amount)
static void pack(void *packdata, void *&pos, int size)
std::vector< std::pair< int, MsgNum_t > > informOKList
void unpack_retransmission_request(int nitems, void *pos)
std::vector< MsgNum_t > sentOKList
#define IPPL_RETRANSMIT_TAG
std::string::iterator iterator
virtual bool resend(void *buf, int size, int node, int etag)
std::vector< std::pair< int, MsgNum_t > > requestList
void perform_resend(MsgNum_t mnum)
Message * receive_block(int &node, int &tag)
virtual Message * myreceive(int &node, int &tag, int etag)
Message * receive(int &node, int &tag)
Message & setDelete(const bool c)
Message * unpack_message(int &node, int &tag, void *pos)
virtual const char * name() const
Message & setCopy(const bool c)
bool send(Message *, int node, int tag, bool delmsg=true)
std::vector< MsgNum_t > resendList
virtual int broadcast_all(Message *, int)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Inform & endl(Inform &inf)
void send_ok_message(int node, MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)