OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
Communicate.cpp
Go to the documentation of this file.
1// -*- C++ -*-
2/***************************************************************************
3 *
4 * The IPPL Framework
5 *
6 * This program was prepared by PSI.
7 * All rights in the program are reserved by PSI.
8 * Neither PSI nor the author(s)
9 * makes any warranty, express or implied, or assumes any liability or
10 * responsibility for the use of this software
11 *
12 * Visit www.amas.web.psi for more details
13 *
14 ***************************************************************************/
15
16// -*- C++ -*-
17/***************************************************************************
18 *
19 * The IPPL Framework
20 *
21 *
22 * Visit http://people.web.psi.ch/adelmann/ for more details
23 *
24 ***************************************************************************/
25
27// Communicate - common member functions for Communicate object.
28// On-node traffic
29// is handle here and architecture specific routines are called for off-node
30// traffic. This is the base class for all comm-lib-specific subclasses.
32
33// include files
34#include "Message/Communicate.h"
35#include "Message/Message.h"
36#include "Message/CRC.h"
38
39#include "Utility/IpplInfo.h"
40#include "Utility/IpplStats.h"
41#include "Utility/PAssert.h"
42#include <cstdio>
43
44
46// print summary of this class to the given output stream
47std::ostream& operator<<(std::ostream& o, const Communicate& c)
48{
49
50 o << "Parallel communication method: " << c.name() << "\n";
51 o << " Total nodes: " << c.getNodes() << ", Current node: ";
52 o << c.myNode() << "\n";
53 o << " Queued received messages: ";
54 o << c.getReceived() <<"\n";
55
56 return o;
57}
58
59
61// Constructor.
62// arguments: command-line args, and number of processes
63// to start (if < 0, start the 'default' number, i.e. the number of
64// hosts
65// Note: The base-class constructor does not need the argument info or
66// the number of nodes, it just by default sets the number of nodes=1
67// Also note: the derived classes should erase Contexts and Processes, and
68// put in the proper values.
69Communicate::Communicate(int, char **, int)
70 : nextMsgNum(1)
71{
72
73 // initialize data for Communicate
74 TotalNodes = 1;
75 myHost = 0;
77 Contexts.push_back(1);
78 Processes.push_back(Contexts); // using Contexts is just convenient here
79}
80
81
83// Destructor. Nothing to do at present.
85{
86
87
88 // delete the cached messages
89 SentCache_t::iterator cachei = sentMsgCache.begin();
90 for ( ; cachei != sentMsgCache.end(); ++cachei)
91 (*cachei).second.freebuf();
92}
93
94
96// Add a new on-node message to the linked list. Return success.
97bool Communicate::add_msg(Message *msg, int node, int tag)
98{
99 recMsgList.push_back(MessageData(node, tag, msg));
100 return true;
101}
102
103
105// Looks for a message in the message queue from the specified node
106// and tag. This understands wildcards for node and tag.
107// Returns a pointer to the Message object found, and sets node and
108// tag equal to the proper values. Also, this will remove the item from
109// the queue.
110Message* Communicate::find_msg(int& node, int& tag)
111{
112
113 // just find the first message that meets the criteria
116 for ( ; qi != qend ; ++qi)
117 {
118 if ((node == COMM_ANY_NODE || (*qi).node == node) &&
119 (tag == COMM_ANY_TAG || (*qi).tag == tag))
120 {
121 node = (*qi).node;
122 tag = (*qi).tag;
123 Message *retval = (*qi).msg;
124 recMsgList.erase(qi);
125 //INCIPPLSTAT(incMessageReceived);
126 //INCIPPLSTAT(incMessageReceivedFromQueue);
127 return retval;
128 }
129 }
130
131 // if we're here, no message was found
132 return 0;
133}
134
135
137// Default version of virtual send function ... here, does nothing.
138bool Communicate::mysend(Message *, int, int, int)
139{
140
141 // just return false, since we cannot send a message with this function
142 return false;
143}
144
145
147// Default version of virtual receive function ... here, does nothing.
149{
150
151 // just return NULL, since we cannot find a message with this function
152 return 0;
153}
154
155
157// Default version of virtual barrier function ... here, does nothing.
159{
160
161
162 // just return NULL, since we cannot find a message with this function
163 return;
164}
165
166
168// resent a message buffer that has been previously packed and copied
169// into the provided buffer. Return success.
170bool Communicate::resend(void *, int, int, int)
171{
172
173 // just return false, since we cannot resend a message with this function
174 return false;
175}
176
177
179// Send data to the given node, with given tag. If delmsg==true, the
180// message will be deleted after it is sent, otherwise it will be left alone.
181bool Communicate::send(Message *msg, int node, int tag, bool delmsg)
182{
183 bool retval;
184
185 // process list of resend requests
186 //process_resend_requests();
187
188 // check for problems ...
189 if ( node < 0 || node >= getNodes() || tag < 0 )
190 {
191 ERRORMSG("Communicate: illegal send node " << node << endl);
193 return false;
194 }
195
196 // if the message is addressed to this node, put it in the local receive
197 // queue immediately
198 if ( node == myNode() )
199 {
200 retval = add_msg(msg, node, tag);
201 //INCIPPLSTAT(incMessageSent);
202 //INCIPPLSTAT(incMessageSentToSelf);
203 }
204 else
205 {
206 // the message must be sent elsewhere ... call the proper function
207 retval = mysend(msg, node, tag, COMM_SEND_TAG);
208
209 // if the send was successful, delete the message if requested
210 if (retval)
211 {
212 //INCIPPLSTAT(incMessageSent);
213 //INCIPPLSTAT(incMessageSentToOthers);
214 if (delmsg)
215 delete msg;
216 }
217 }
218
219 // set error code
220 ErrorStatus = (retval != 0 ? COMM_NOERROR : COMM_NOSEND);
221
222 // return the success of the operation
223 return retval;
224}
225
226
228// Receive data from another node. Returns newly created Message object
229// with received message, or NULL if no message is available.
230// If node == COMM_ANY_NODE, this will receive the next message with the given
231// tag from any node.
232// If tag == COMM_ANY_TAG, this will receive the next message with
233// any tag from the given node. If both are wildcards, this will receive the
234// next message, period. node and tag are passed by reference; if either
235// is a wildcard, and a message is received, they are changed to their actual
236// values.
237// Messages are searched for in this order (if node == COMM_ANY_NODE) :
238// 1. Pending in network
239// 2. In receive queue
240Message* Communicate::receive(int& node, int& tag)
241{
242
243
244 //Inform dbgmsg("Comm::receive", INFORM_ALL_NODES);
245 //dbgmsg << "Doing receive from node " << node << ", tag " << tag << endl;
246
247 // process list of resend requests
249
250 // do a check for a message from another node
251 //dbgmsg << "Checking for queued message ..." << endl;
252 Message *msg = find_msg(node, tag);
253 //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
254
255 if (msg == 0 && myNode() != node)
256 {
257 int checknode = node;
258 int checktag = tag;
259 //INCIPPLSTAT(incMessageReceiveChecks);
260 //dbgmsg << "Checking for remote message ..." << endl;
261 if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
262 {
263 // see if the message matches our criteria for searching
264 //dbgmsg << "Message found from node " << checknode << " with tag ";
265 //dbgmsg << checktag << endl;
266 if ((node != COMM_ANY_NODE && node != checknode) ||
267 (tag != COMM_ANY_TAG && tag != checktag ))
268 {
269 // the message does not match; queue it and report no msg found
270 //dbgmsg << "But it's not what we want." << endl;
271 add_msg(msg, checknode, checktag);
272 msg = 0;
273 }
274 else
275 {
276 // the message matches; save the node and tag and return the msg
277 //dbgmsg << "And it is what we want!" << endl;
278 node = checknode;
279 tag = checktag;
280 //INCIPPLSTAT(incMessageReceived);
281 //INCIPPLSTAT(incMessageReceivedFromNetwork);
282 }
283 }
284 else
285 {
286 //INCIPPLSTAT(incMessageReceiveChecksFailed);
287 }
288 }
289
290 // set error code
291 ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
292
293 // return the message, or NULL if none was found
294 return msg;
295}
296
297
299// A blocking version of receive.
301{
302
303
304
305
306
307 // process list of resend requests
309
310 //Inform dbgmsg("Comm::receive_block", INFORM_ALL_NODES);
311 //dbgmsg << "Doing blocking receive from node " << node;
312 //dbgmsg << ", tag " << tag << endl;
313
314 // If we haven't already found a message, check the local messages
315 //dbgmsg << "Checking for queued message ..." << endl;
316
317 Message *msg = find_msg(node, tag);
318
319 //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
320
321 // keep checking for remote msgs until we get one
322
323 if (myNode() != node)
324 {
325 while (msg == 0)
326 {
327 // process list of resend requests
329
330 int checknode = node;
331 int checktag = tag;
332 //INCIPPLSTAT(incMessageReceiveChecks);
333 //dbgmsg << "Checking for remote message ..." << endl;
334 if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
335 {
336 // see if the message matches our criteria for searching
337 //dbgmsg << "Message found from node " << checknode << " with tag ";
338 //dbgmsg << checktag << endl;
339 if ((node != COMM_ANY_NODE && node != checknode) ||
340 (tag != COMM_ANY_TAG && tag != checktag ))
341 {
342 // the message does not match; queue it and report no msg found
343 //dbgmsg << "But it's not what we want." << endl;
344 add_msg(msg, checknode, checktag);
345 msg = 0;
346 }
347 else
348 {
349 // the message matches; save the node and tag and return the msg
350 //dbgmsg << "And it is what we want!" << endl;
351 node = checknode;
352 tag = checktag;
353 //INCIPPLSTAT(incMessageReceived);
354 //INCIPPLSTAT(incMessageReceivedFromNetwork);
355 }
356 }
357 else
358 {
359 //INCIPPLSTAT(incMessageReceiveChecksFailed);
360 if (Ippl::retransmit())
361 msg = find_msg(node, tag);
362 }
363 }
364 }
365
366
367 // If we're on just one node, and we did not find a message, this is
368 // a big problem.
369 PInsist(!(myNode() == node && msg == 0),
370 "Local message not found in Communicate::receive_block!!");
371
372 // set error code
373 ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
374
375 // return the message, or NULL if none was found
376 return msg;
377}
378
379
381// Broadcast the given message to ALL nodes, including this node.
382// Return number of nodes sent to.
383// Arguments are the Message, and the tag for the message.
385{
386 int i; // loop variable
387
388 // send message to all other nodes
389 for (i=(getNodes() - 1); i >= 0; i--)
390 {
391 if (i != myNode())
392 {
393 mysend(msg, i, tag, COMM_SEND_TAG);
394 //INCIPPLSTAT(incMessageSent);
395 //INCIPPLSTAT(incMessageSentToOthers);
396 }
397 }
398
399 // send message to this node; since we do this, don't need to delete msg
400 add_msg(msg, myNode(), tag);
401 //INCIPPLSTAT(incMessageSent);
402 //INCIPPLSTAT(incMessageSentToSelf);
403
404 return getNodes();
405}
406
407
409// Broadcast the given message to all OTHER nodes, but not this node.
410// Return number of nodes sent to.
411// Arguments are the Message, and the tag for the message, and whether
412// we should delete the given message object.
413int Communicate::broadcast_others(Message *msg, int tag, bool delmsg)
414{
415 int i; // loop variable
416
417 // send message to all other nodes
418 for (i=(getNodes() - 1); i >= 0; i--)
419 {
420 if (i != myNode())
421 {
422 mysend(msg, i, tag, COMM_SEND_TAG);
423 //INCIPPLSTAT(incMessageSent);
424 //INCIPPLSTAT(incMessageSentToOthers);
425 }
426 }
427
428 // delete message
429 if (delmsg)
430 delete msg;
431
432 return getNodes() - 1;
433}
434
435
437// Synchronize all processors (everybody waits for everybody
438// else to get here before returning to calling function).
440{
441
442
443 mybarrier();
444 //INCIPPLSTAT(incBarriers);
445}
446
447
449// clean up after a Message has been used (called by Message). By
450// default, does nothing.
452
453
455// calculate how big the buffer must be to send the given message
457{
458
459 static const unsigned int longsize = wordround(sizeof(MsgNum_t));
460 static const unsigned int intsize4 = wordround(4 * sizeof(int));
461 static const unsigned int intsize2 = wordround(2 * sizeof(int));
462
463 // the message contains a long and three integers at the start with the
464 // msg num, node, tag, and number of items
465 unsigned int buffsize = longsize + intsize4;
466
467 // now include the sizes of the elements themselves. For each item,
468 // we also include two integers with size information.
469 int nitems = msg.size();
470 for (int i=0; i < nitems; ++i)
471 buffsize += (intsize2 + wordround(msg.item(i).numBytes()));
472
473 // if checksums are to be performed, add in space for the 32-bit checksum
474 if (Ippl::useChecksums())
475 buffsize += sizeof(CRCTYPE);
476
477 return buffsize;
478}
479
480
482// put data from the given Message into the given buffer
483void Communicate::fill_msg_buffer(void *buffer, Message &msg, int tag,
484 int bufsize, int node)
485{
486
487 void *pos = buffer; // location in buffer to pack data
488 int nitems = msg.size(); // Number of items in Message
489 int mdata[4]; // Array to store msg header info
490 MsgNum_t mnum = (nextMsgNum++); // Message ID
491
492 //Inform dbgmsg("***Communicate::fill_msg_buffer", INFORM_ALL_NODES);
493 //dbgmsg << "Preparing to send out message " << mnum;
494 //dbgmsg << " with tag " << tag << " of size " << bufsize << endl;
495
496 // we must make sure to zero out the buffer if we're using checksums,
497 // so that random data values do not occur in the spaces where word
498 // alignment padding is used
499 if (Ippl::useChecksums())
500 memset(pos, 0, bufsize);
501
502 // put message ID info into the buffer
503 pack(&mnum, pos, sizeof(MsgNum_t));
504
505 // put message header info into the buffer
506 mdata[0] = tag;
507 mdata[1] = myNode();
508 mdata[2] = nitems;
509 mdata[3] = bufsize;
510 pack(mdata, pos, 4*sizeof(int));
511
512 // finally pack in the data
513 for (int i=0; i < nitems; ++i)
514 {
515 Message::MsgItem &msgitem = msg.item(i);
516 mdata[0] = msgitem.numElems();
517 mdata[1] = msgitem.numBytes();
518 pack(mdata, pos, 2*sizeof(int));
519 if (mdata[1] > 0)
520 pack(msgitem.data(), pos, mdata[1]);
521 }
522
523 // if checksums are on, find the checksum and append it to the buffer
524 if (Ippl::useChecksums())
525 {
526 // calculate the crc
527 int crcsize = bufsize - sizeof(CRCTYPE);
528 CRCTYPE crcval = crc(buffer, crcsize);
529
530 // append it to the end of the buffer
531 *(static_cast<CRCTYPE *>(pos)) = crcval;
532
533 // if we're trying to retransmit, cache the message
534 if (Ippl::retransmit())
535 {
536 if (tag != IPPL_RETRANSMIT_TAG && tag != IPPL_MSG_OK_TAG)
537 {
538 //dbgmsg << "Adding message " << mnum << " of size " << bufsize;
539 //dbgmsg << " with tag " << tag << " to sent cache." << endl;
540 add_to_send_cache(buffer, mnum, bufsize, node);
541 }
542 else
543 {
544 //dbgmsg << "NOT adding msg with tag " << tag << " to cache" << endl;
545 }
546 }
547 }
548
549 ADDIPPLSTAT(incMessageBytesSent,bufsize);
550}
551
552
554// get data out of a buffer and create a Message
555Message* Communicate::unpack_message(int &node, int &tag, void *buffer)
556{
557
558 Message *newmsg = 0;
559
560 // pos will always point to the next location in the buffer to get data
561 void *pos = buffer;
562
563 // get the message ID number
564 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
565 addwordround(pos, sizeof(MsgNum_t));
566
567 // get the tag, sender, and number of messages
568 int *mdata = static_cast<int *>(pos);
569 tag = mdata[0];
570 node = mdata[1];
571 int nitems = mdata[2];
572 int bufsize = mdata[3];
573 addwordround(pos, 4*sizeof(int));
574
575 //WARNMSG("Received message " << mnum << " from node " << node);
576 //WARNMSG(" with tag " << tag << " of size " << bufsize << endl);
577
578 // check for special tags, to abort, retransmit, or just receive
579 if (tag == IPPL_ABORT_TAG)
580 {
581 ERRORMSG("Stopping due to abort request sent from node " << node << endl);
582 ::abort();
583
584 }
585 else if (tag == IPPL_EXIT_TAG)
586 {
587 ERRORMSG("Exiting due to exit request sent from node " << node << endl);
588 ::exit(1);
589
590 }
591 else if (tag == IPPL_RETRANSMIT_TAG)
592 {
593 // get the retransmit message number and tag out of the current buffer
595
596 }
597 else if (tag == IPPL_MSG_OK_TAG)
598 {
599 // clear out the messages that this message lists are OK to be deleted
600 clear_ok_messages(nitems, pos);
601
602 }
603 else
604 {
605 // this is just a regular message
606
607 // do checksum comparison, if checksums are on
608 if (Ippl::useChecksums())
609 {
610 // calculate the crc
611 int crcsize = bufsize - sizeof(CRCTYPE);
612 CRCTYPE crcval = crc(buffer, crcsize);
613
614 // as a test, randomly change crcval
615 //if (IpplRandom() < 0.1)
616 // crcval += 1;
617
618 // compare this crc to the original one
619 void *origloc = static_cast<void *>(static_cast<char *>(buffer)+crcsize);
620 CRCTYPE origcrc = *(static_cast<CRCTYPE *>(origloc));
621 if (crcval != origcrc)
622 {
623 ERRORMSG("Failed CRC check (" << crcval << " != " << origcrc);
624 ERRORMSG(") on node " << Ippl::myNode());
625 ERRORMSG(" for message " << mnum << " of size " << bufsize);
626 ERRORMSG(" bytes sent from node ");
627 ERRORMSG(node << " with tag " << tag << endl);
628 if (Ippl::retransmit())
629 {
630 // send off a request to have message 'mnum' resent to us by 'node'
631 requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
632 }
633 else
634 {
635 // since we're not trying to retransmit, we just quit.
636 PInsist(crcval == origcrc, "Exiting due to CRC check failure.");
637 }
638
639 // and then return 0 so that the caller knows there was a problem
640 return 0;
641 }
642 }
643
644 // if we're here, the checksums (if enabled) were OK, so receive the
645 // message
646
647 // create data structure for this message
648 newmsg = new Message(nitems);
649
650 // get all the items and add to the message
651 for (int j = 0; j < nitems; j++)
652 {
653 int *hdr = static_cast<int *>(pos);
654 int elements = hdr[0];
655 int bytesize = hdr[1];
656 addwordround(pos, 2*sizeof(int));
657
658 // for each item, find the pointer to the actual data and give
659 // that pointer to the Message object. The Message object then
660 // does not delete the data until the very end, when the Message
661 // is deleted.
662 if (bytesize > 0 && elements > 0)
663 {
664 newmsg->setCopy(false);
665 newmsg->setDelete(false);
666 newmsg->putmsg(pos, bytesize/elements, elements);
667 addwordround(pos, bytesize);
668 }
669 }
670
671 // indicate we've received a normal message
672 ADDIPPLSTAT(incMessageBytesReceived,bufsize);
673
674 // tell the sender that we received this message OK
675 if (Ippl::retransmit())
676 informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
677 }
678
679 // return the new message, or zero to indicate the buffer contained
680 // something else than an actual message
681 return newmsg;
682}
683
684
686// put the given message buffer in the sent-message cache, as a new
687// CommSendInfo object storing the buffer and other information.
688void Communicate::add_to_send_cache(void *msgbuf, MsgNum_t mnum, int msgsize,
689 int node)
690{
691
692 // make sure we do not already have this message
693 SentCache_t::iterator senti = sentMsgCache.find(mnum);
694 if (senti != sentMsgCache.end())
695 {
696 ERRORMSG("ERROR: Trying to cache an already-cached message with num = ");
697 ERRORMSG(mnum << endl);
698 return;
699 }
700
701 //Inform dbgmsg("***Communicate::add_to_send_cache", INFORM_ALL_NODES);
702 //dbgmsg << "Adding message " << mnum << " to cache with size = " << msgsize;
703 //dbgmsg << endl;
704
705 // make a copy of the message
706 char *copybuf = new char[msgsize];
707 memcpy(copybuf, msgbuf, msgsize);
708
709 // add the message to the cache list
710 CommSendInfo csi(msgsize, copybuf, node);
711 sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
712
713 //dbgmsg<<"Sent message cached; now " << sentMsgCache.size() << " buffers ";
714 //dbgmsg << "in the cache." << endl;
715}
716
717
719// send off a request to have this message retransmitted to us
721{
722 Inform dbgmsg("***Communicate::request_retransmission", INFORM_ALL_NODES);
723 dbgmsg << "Requesting retransmission of message " << mnum << " from node ";
724 dbgmsg << node << endl;
725
726 // create a regular message, but with the
727 // special retransmit tag and the message number as the one item
728 // in the Message
729 Message msg(1);
730 msg.put(mnum);
731 send(&msg, node, IPPL_RETRANSMIT_TAG, false); // does not delete message
732}
733
734
736// get the resend information from a buffer sent in a message requesting
737// retransmission
739{
740 Inform dbgmsg("***Communicate::unpack_retrans_req", INFORM_ALL_NODES);
741 //dbgmsg << "Unpacking retransmission request ..." << endl;
742
743 // retransmission messages have the following information as separate
744 // items:
745 // message number to retransmit (type == MsgNum_t)
746 // so, nitems should be one, and the bytesize should match
747 PInsist(nitems == 1, "Wrong number of items in retransmit request.");
748
749 // get the retransmit message number item header info
750 int *hdr = static_cast<int *>(pos);
751 PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
752 "Wrong message info for retransmit message number.");
753 addwordround(pos, 2*sizeof(int));
754
755 // get the actual retransmit message number
756 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
757 dbgmsg << "Received request to resend message " << mnum << endl;
758 resendList.push_back(mnum);
759}
760
761
763// for message mnum, resend the data
765{
766 // get the message info from our cache.
767 SentCache_t::iterator senti = sentMsgCache.find(mnum);
768 PInsist(senti != sentMsgCache.end(),
769 "Could not find message in local sent cache to retransmit.");
770
771 // get the node and size
772 int size = (*senti).second.size();
773 int node = (*senti).second.node();
774
775 // resend the data
776 ERRORMSG("WARNING: Resending message number " << mnum);
777 ERRORMSG(" of size " << size << " from node ");
778 ERRORMSG(myNode() << " to node " << node << " ..." << endl);
779 resend((*senti).second.buf(), size, node, COMM_SEND_TAG);
780}
781
782
784// tell the sender that we received this message OK
786{
787 Inform dbgmsg("***Communicate::send_ok_message", INFORM_ALL_NODES);
788 dbgmsg << "Informing node " << node << " that message " << mnum;
789 dbgmsg << " was received ok." << endl;
790
791 Message msg(1);
792 msg.put(mnum); // the list of message numbers, one at a time
793
794 send(&msg, node, IPPL_MSG_OK_TAG, false); // does not delete message
795}
796
797
799// unpack message with a list of OK message numbers, and delete them
800// from our cache
801void Communicate::clear_ok_messages(int nitems, void *pos)
802{
803 Inform dbgmsg("***Communicate::clear_ok_messages", INFORM_ALL_NODES);
804 //dbgmsg << "Unpacking messages-ok information for " << nitems;
805 //dbgmsg << " messages ..." << endl;
806
807 // message-ok messages have the following information as separate
808 // items:
809 // the number of OK messages (type == int)
810 // the first OK message number (type == MsgNum_t)
811 // the second OK message number (type == MsgNum_t)
812 // etc
813 PInsist(nitems >= 1, "Wrong number of items in retransmit request.");
814
815 // loop through the list of items, get the message number from each,
816 // and remove that message from our queue
817 for (int i=0; i < nitems; ++i)
818 {
819 // get the message-ok header
820 int *hdr = static_cast<int *>(pos);
821 PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
822 "Wrong message info for message-ok number.");
823 addwordround(pos, 2*sizeof(int));
824
825 // get the message-ok number
826 MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
827 addwordround(pos, sizeof(MsgNum_t));
828
829 // add this number to our list of messages to say are OK
830 dbgmsg << "Will clear message " << mnum << " as OK." << endl;
831 sentOKList.push_back(mnum);
832 }
833}
834
835
837// unpack message with a list of OK message numbers, and delete them
838// from our cache
840{
841 Inform dbgmsg("***Communicate::remove_single_ok_message", INFORM_ALL_NODES);
842
843 // check if we have that message
844 SentCache_t::iterator senti = sentMsgCache.find(mnum);
845 if (senti == sentMsgCache.end())
846 {
847 // we do not have it; print an error message
848 ERRORMSG("ERROR: Received 'message ok' for message " << mnum);
849 ERRORMSG(", but this node does not have that message in cache." << endl);
850
851 }
852 else
853 {
854 // we have it, so remove it after freeing the buffer
855 (*senti).second.freebuf();
856 sentMsgCache.erase(senti);
857 dbgmsg << "Removed message " << mnum << " from send cache; now ";
858 dbgmsg << sentMsgCache.size() << " messages in list." << endl;
859 }
860}
861
862
864// process list of resend requests
866{
867 if (!resendList.empty())
868 {
869 Inform dbgmsg("***Communicate::process_resend_reqs", INFORM_ALL_NODES);
870 dbgmsg << "Clearing " << sentOKList.size() << " and resending ";
871 dbgmsg << resendList.size() << " messages ..." << endl;
872 }
873
874 // clear out OK messages
875 while (!sentOKList.empty())
876 {
877 MsgNum_t mnum = *(sentOKList.begin());
878 sentOKList.erase(sentOKList.begin());
880 }
881
882 // resend a message, if necessary
883 while (!resendList.empty())
884 {
885 MsgNum_t mnum = *(resendList.begin());
886 resendList.erase(resendList.begin());
887 perform_resend(mnum);
888 }
889
890 // inform other nodes that we've received their messages ok
891 while (!informOKList.empty())
892 {
893 int node = (*(informOKList.begin())).first;
894 MsgNum_t mnum = (*(informOKList.begin())).second;
895 informOKList.erase(informOKList.begin());
896 send_ok_message(node, mnum);
897 }
898
899 // request resends from other nodes
900 while (!requestList.empty())
901 {
902 int node = (*(requestList.begin())).first;
903 MsgNum_t mnum = (*(requestList.begin())).second;
904 requestList.erase(requestList.begin());
905 request_retransmission(node, mnum);
906 }
907}
T * value_type(const SliceIterator< T > &)
elements
Definition: IndexMap.cpp:163
CRCTYPE crc(void *icp, int icnt)
Definition: CRC.cpp:106
unsigned int CRCTYPE
Definition: CRC.h:25
#define IPPL_EXIT_TAG
Definition: Tags.h:26
#define IPPL_RETRANSMIT_TAG
Definition: Tags.h:27
#define IPPL_MSG_OK_TAG
Definition: Tags.h:28
#define IPPL_ABORT_TAG
Definition: Tags.h:25
std::ostream & operator<<(std::ostream &o, const Communicate &c)
Definition: Communicate.cpp:47
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define INFORM_ALL_NODES
Definition: Inform.h:39
#define PInsist(c, m)
Definition: PAssert.h:120
#define ADDIPPLSTAT(stat, amount)
Definition: IpplStats.h:237
#define ERRORMSG(msg)
Definition: IpplInfo.h:350
constexpr double c
The velocity of light in m/s.
Definition: Physics.h:45
std::string::iterator iterator
Definition: MSLang.h:16
virtual bool mysend(Message *, int node, int utag, int etag)
std::vector< int > Contexts
Definition: Communicate.h:264
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
std::vector< MessageData > recMsgList
Definition: Communicate.h:258
static void addwordround(void *&pos, int size)
Definition: Communicate.h:358
Message * receive(int &node, int &tag)
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
Definition: Communicate.h:279
std::vector< std::pair< int, MsgNum_t > > informOKList
Definition: Communicate.h:285
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
std::vector< std::pair< int, MsgNum_t > > requestList
Definition: Communicate.h:288
static unsigned int wordround(int size)
Definition: Communicate.h:350
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
Definition: Communicate.cpp:69
MsgNum_t nextMsgNum
Definition: Communicate.h:270
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
void barrier(void)
virtual void mybarrier(void)
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
int getNodes() const
Definition: Communicate.h:143
virtual ~Communicate(void)
Definition: Communicate.cpp:84
virtual int broadcast_all(Message *, int)
std::vector< MsgNum_t > sentOKList
Definition: Communicate.h:282
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:97
static void pack(void *packdata, void *&pos, int size)
Definition: Communicate.h:364
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
int myNode() const
Definition: Communicate.h:155
SentCache_t sentMsgCache
Definition: Communicate.h:276
void fill_msg_buffer(void *, Message &, int, int, int)
size_t size() const
Definition: Message.h:292
Message & putmsg(void *, int, int=0)
Message & setCopy(const bool c)
Definition: Message.h:319
Message & put(const T &val)
Definition: Message.h:406
MsgItem & item(size_t n)
Definition: Message.h:308
Message & setDelete(const bool c)
Definition: Message.h:331
unsigned int numBytes() const
Definition: Message.h:218
unsigned int numElems() const
Definition: Message.h:222
void * data()
Definition: Message.h:244
Definition: Inform.h:42
static bool retransmit()
Definition: IpplInfo.h:195
static bool useChecksums()
Definition: IpplInfo.h:192
static int myNode()
Definition: IpplInfo.cpp:691