50 o <<
"Parallel communication method: " <<
c.name() <<
"\n";
51 o <<
" Total nodes: " <<
c.getNodes() <<
", Current node: ";
52 o <<
c.myNode() <<
"\n";
53 o <<
" Queued received messages: ";
54 o <<
c.getReceived() <<
"\n";
91 (*cachei).second.freebuf();
116 for ( ; qi != qend ; ++qi)
189 if ( node < 0 || node >=
getNodes() || tag < 0 )
191 ERRORMSG(
"Communicate: illegal send node " << node <<
endl);
200 retval =
add_msg(msg, node, tag);
255 if (msg == 0 &&
myNode() != node)
257 int checknode = node;
271 add_msg(msg, checknode, checktag);
330 int checknode = node;
344 add_msg(msg, checknode, checktag);
370 "Local message not found in Communicate::receive_block!!");
389 for (i=(
getNodes() - 1); i >= 0; i--)
418 for (i=(
getNodes() - 1); i >= 0; i--)
460 static const unsigned int intsize4 =
wordround(4 *
sizeof(
int));
461 static const unsigned int intsize2 =
wordround(2 *
sizeof(
int));
465 unsigned int buffsize = longsize + intsize4;
469 int nitems = msg.
size();
470 for (
int i=0; i < nitems; ++i)
484 int bufsize,
int node)
488 int nitems = msg.
size();
500 memset(pos, 0, bufsize);
510 pack(mdata, pos, 4*
sizeof(
int));
513 for (
int i=0; i < nitems; ++i)
518 pack(mdata, pos, 2*
sizeof(
int));
520 pack(msgitem.
data(), pos, mdata[1]);
527 int crcsize = bufsize -
sizeof(
CRCTYPE);
531 *(
static_cast<CRCTYPE *
>(pos)) = crcval;
568 int *mdata =
static_cast<int *
>(pos);
571 int nitems = mdata[2];
572 int bufsize = mdata[3];
581 ERRORMSG(
"Stopping due to abort request sent from node " << node <<
endl);
587 ERRORMSG(
"Exiting due to exit request sent from node " << node <<
endl);
611 int crcsize = bufsize -
sizeof(
CRCTYPE);
619 void *origloc =
static_cast<void *
>(
static_cast<char *
>(buffer)+crcsize);
621 if (crcval != origcrc)
623 ERRORMSG(
"Failed CRC check (" << crcval <<
" != " << origcrc);
625 ERRORMSG(
" for message " << mnum <<
" of size " << bufsize);
631 requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
636 PInsist(crcval == origcrc,
"Exiting due to CRC check failure.");
651 for (
int j = 0; j < nitems; j++)
653 int *hdr =
static_cast<int *
>(pos);
655 int bytesize = hdr[1];
676 informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
696 ERRORMSG(
"ERROR: Trying to cache an already-cached message with num = ");
706 char *copybuf =
new char[msgsize];
707 memcpy(copybuf, msgbuf, msgsize);
723 dbgmsg <<
"Requesting retransmission of message " << mnum <<
" from node ";
724 dbgmsg << node <<
endl;
747 PInsist(nitems == 1,
"Wrong number of items in retransmit request.");
750 int *hdr =
static_cast<int *
>(pos);
752 "Wrong message info for retransmit message number.");
757 dbgmsg <<
"Received request to resend message " << mnum <<
endl;
769 "Could not find message in local sent cache to retransmit.");
772 int size = (*senti).second.size();
773 int node = (*senti).second.node();
776 ERRORMSG(
"WARNING: Resending message number " << mnum);
777 ERRORMSG(
" of size " << size <<
" from node ");
788 dbgmsg <<
"Informing node " << node <<
" that message " << mnum;
789 dbgmsg <<
" was received ok." <<
endl;
813 PInsist(nitems >= 1,
"Wrong number of items in retransmit request.");
817 for (
int i=0; i < nitems; ++i)
820 int *hdr =
static_cast<int *
>(pos);
822 "Wrong message info for message-ok number.");
830 dbgmsg <<
"Will clear message " << mnum <<
" as OK." <<
endl;
848 ERRORMSG(
"ERROR: Received 'message ok' for message " << mnum);
849 ERRORMSG(
", but this node does not have that message in cache." <<
endl);
855 (*senti).second.freebuf();
857 dbgmsg <<
"Removed message " << mnum <<
" from send cache; now ";
870 dbgmsg <<
"Clearing " <<
sentOKList.size() <<
" and resending ";
T * value_type(const SliceIterator< T > &)
std::ostream & operator<<(std::ostream &o, const Communicate &c)
CRCTYPE crc(void *icp, int icnt)
#define IPPL_RETRANSMIT_TAG
Inform & endl(Inform &inf)
#define ADDIPPLSTAT(stat, amount)
constexpr double c
The velocity of light in m/s.
std::string::iterator iterator
virtual bool mysend(Message *, int node, int utag, int etag)
std::vector< int > Contexts
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
std::vector< MessageData > recMsgList
static void addwordround(void *&pos, int size)
Message * receive(int &node, int &tag)
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
std::vector< std::pair< int, MsgNum_t > > informOKList
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
std::vector< std::pair< int, MsgNum_t > > requestList
static unsigned int wordround(int size)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
virtual void mybarrier(void)
std::vector< std::vector< int > > Processes
virtual ~Communicate(void)
virtual int broadcast_all(Message *, int)
std::vector< MsgNum_t > sentOKList
bool add_msg(Message *, int, int)
static void pack(void *packdata, void *&pos, int size)
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
void fill_msg_buffer(void *, Message &, int, int, int)
Message & setDelete(const bool c)
Message & put(const T &val)
Message & putmsg(void *, int, int=0)
Message & setCopy(const bool c)
unsigned int numBytes() const
unsigned int numElems() const
static bool useChecksums()