OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
Communicate.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
27 // Communicate - common member functions for Communicate object.
28 // On-node traffic
29 // is handle here and architecture specific routines are called for off-node
30 // traffic. This is the base class for all comm-lib-specific subclasses.
32 
33 // include files
34 #include "Message/Communicate.h"
35 #include "Message/Message.h"
36 #include "Message/CRC.h"
37 #include "PETE/IpplExpressions.h"
38 
39 #include "Utility/IpplInfo.h"
40 #include "Utility/IpplStats.h"
41 #include "Utility/PAssert.h"
42 #include <cstdio>
43 
44 
46 // print summary of this class to the given output stream
47 std::ostream& operator<<(std::ostream& o, const Communicate& c)
48 {
49 
50  o << "Parallel communication method: " << c.name() << "\n";
51  o << " Total nodes: " << c.getNodes() << ", Current node: ";
52  o << c.myNode() << "\n";
53  o << " Queued received messages: ";
54  o << c.getReceived() <<"\n";
55 
56  return o;
57 }
58 
59 
61 // Constructor.
62 // arguments: command-line args, and number of processes
63 // to start (if < 0, start the 'default' number, i.e. the number of
64 // hosts
65 // Note: The base-class constructor does not need the argument info or
66 // the number of nodes, it just by default sets the number of nodes=1
67 // Also note: the derived classes should erase Contexts and Processes, and
68 // put in the proper values.
69 Communicate::Communicate(int, char **, int)
70  : nextMsgNum(1)
71 {
72 
73  // initialize data for Communicate
74  TotalNodes = 1;
75  myHost = 0;
77  Contexts.push_back(1);
78  Processes.push_back(Contexts); // using Contexts is just convenient here
79 }
80 
81 
83 // Destructor. Nothing to do at present.
85 {
86 
87 
88  // delete the cached messages
89  SentCache_t::iterator cachei = sentMsgCache.begin();
90  for ( ; cachei != sentMsgCache.end(); ++cachei)
91  (*cachei).second.freebuf();
92 }
93 
94 
96 // Add a new on-node message to the linked list. Return success.
97 bool Communicate::add_msg(Message *msg, int node, int tag)
98 {
99  recMsgList.push_back(MessageData(node, tag, msg));
100  return true;
101 }
102 
103 
105 // Looks for a message in the message queue from the specified node
106 // and tag. This understands wildcards for node and tag.
107 // Returns a pointer to the Message object found, and sets node and
108 // tag equal to the proper values. Also, this will remove the item from
109 // the queue.
110 Message* Communicate::find_msg(int& node, int& tag)
111 {
112 
113  // just find the first message that meets the criteria
116  for ( ; qi != qend ; ++qi)
117  {
118  if ((node == COMM_ANY_NODE || (*qi).node == node) &&
119  (tag == COMM_ANY_TAG || (*qi).tag == tag))
120  {
121  node = (*qi).node;
122  tag = (*qi).tag;
123  Message *retval = (*qi).msg;
124  recMsgList.erase(qi);
125  //INCIPPLSTAT(incMessageReceived);
126  //INCIPPLSTAT(incMessageReceivedFromQueue);
127  return retval;
128  }
129  }
130 
131  // if we're here, no message was found
132  return 0;
133 }
134 
135 
137 // Default version of virtual send function ... here, does nothing.
138 bool Communicate::mysend(Message *, int, int, int)
139 {
140 
141  // just return false, since we cannot send a message with this function
142  return false;
143 }
144 
145 
147 // Default version of virtual receive function ... here, does nothing.
149 {
150 
151  // just return NULL, since we cannot find a message with this function
152  return 0;
153 }
154 
155 
157 // Default version of virtual barrier function ... here, does nothing.
159 {
160 
161 
162  // just return NULL, since we cannot find a message with this function
163  return;
164 }
165 
166 
168 // resent a message buffer that has been previously packed and copied
169 // into the provided buffer. Return success.
170 bool Communicate::resend(void *, int, int, int)
171 {
172 
173  // just return false, since we cannot resend a message with this function
174  return false;
175 }
176 
177 
179 // Send data to the given node, with given tag. If delmsg==true, the
180 // message will be deleted after it is sent, otherwise it will be left alone.
181 bool Communicate::send(Message *msg, int node, int tag, bool delmsg)
182 {
183  bool retval;
184 
185  // process list of resend requests
186  //process_resend_requests();
187 
188  // check for problems ...
189  if ( node < 0 || node >= getNodes() || tag < 0 )
190  {
191  ERRORMSG("Communicate: illegal send node " << node << endl);
193  return false;
194  }
195 
196  // if the message is addressed to this node, put it in the local receive
197  // queue immediately
198  if ( node == myNode() )
199  {
200  retval = add_msg(msg, node, tag);
201  //INCIPPLSTAT(incMessageSent);
202  //INCIPPLSTAT(incMessageSentToSelf);
203  }
204  else
205  {
206  // the message must be sent elsewhere ... call the proper function
207  retval = mysend(msg, node, tag, COMM_SEND_TAG);
208 
209  // if the send was successful, delete the message if requested
210  if (retval)
211  {
212  //INCIPPLSTAT(incMessageSent);
213  //INCIPPLSTAT(incMessageSentToOthers);
214  if (delmsg)
215  delete msg;
216  }
217  }
218 
219  // set error code
220  ErrorStatus = (retval != 0 ? COMM_NOERROR : COMM_NOSEND);
221 
222  // return the success of the operation
223  return retval;
224 }
225 
226 
228 // Receive data from another node. Returns newly created Message object
229 // with received message, or NULL if no message is available.
230 // If node == COMM_ANY_NODE, this will receive the next message with the given
231 // tag from any node.
232 // If tag == COMM_ANY_TAG, this will receive the next message with
233 // any tag from the given node. If both are wildcards, this will receive the
234 // next message, period. node and tag are passed by reference; if either
235 // is a wildcard, and a message is received, they are changed to their actual
236 // values.
237 // Messages are searched for in this order (if node == COMM_ANY_NODE) :
238 // 1. Pending in network
239 // 2. In receive queue
240 Message* Communicate::receive(int& node, int& tag)
241 {
242 
243 
244  //Inform dbgmsg("Comm::receive", INFORM_ALL_NODES);
245  //dbgmsg << "Doing receive from node " << node << ", tag " << tag << endl;
246 
247  // process list of resend requests
249 
250  // do a check for a message from another node
251  //dbgmsg << "Checking for queued message ..." << endl;
252  Message *msg = find_msg(node, tag);
253  //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
254 
255  if (msg == 0 && myNode() != node)
256  {
257  int checknode = node;
258  int checktag = tag;
259  //INCIPPLSTAT(incMessageReceiveChecks);
260  //dbgmsg << "Checking for remote message ..." << endl;
261  if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
262  {
263  // see if the message matches our criteria for searching
264  //dbgmsg << "Message found from node " << checknode << " with tag ";
265  //dbgmsg << checktag << endl;
266  if ((node != COMM_ANY_NODE && node != checknode) ||
267  (tag != COMM_ANY_TAG && tag != checktag ))
268  {
269  // the message does not match; queue it and report no msg found
270  //dbgmsg << "But it's not what we want." << endl;
271  add_msg(msg, checknode, checktag);
272  msg = 0;
273  }
274  else
275  {
276  // the message matches; save the node and tag and return the msg
277  //dbgmsg << "And it is what we want!" << endl;
278  node = checknode;
279  tag = checktag;
280  //INCIPPLSTAT(incMessageReceived);
281  //INCIPPLSTAT(incMessageReceivedFromNetwork);
282  }
283  }
284  else
285  {
286  //INCIPPLSTAT(incMessageReceiveChecksFailed);
287  }
288  }
289 
290  // set error code
291  ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
292 
293  // return the message, or NULL if none was found
294  return msg;
295 }
296 
297 
299 // A blocking version of receive.
300 Message *Communicate::receive_block(int& node, int &tag)
301 {
302 
303 
304 
305 
306 
307  // process list of resend requests
309 
310  //Inform dbgmsg("Comm::receive_block", INFORM_ALL_NODES);
311  //dbgmsg << "Doing blocking receive from node " << node;
312  //dbgmsg << ", tag " << tag << endl;
313 
314  // If we haven't already found a message, check the local messages
315  //dbgmsg << "Checking for queued message ..." << endl;
316 
317  Message *msg = find_msg(node, tag);
318 
319  //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
320 
321  // keep checking for remote msgs until we get one
322 
323  if (myNode() != node)
324  {
325  while (msg == 0)
326  {
327  // process list of resend requests
329 
330  int checknode = node;
331  int checktag = tag;
332  //INCIPPLSTAT(incMessageReceiveChecks);
333  //dbgmsg << "Checking for remote message ..." << endl;
334  if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
335  {
336  // see if the message matches our criteria for searching
337  //dbgmsg << "Message found from node " << checknode << " with tag ";
338  //dbgmsg << checktag << endl;
339  if ((node != COMM_ANY_NODE && node != checknode) ||
340  (tag != COMM_ANY_TAG && tag != checktag ))
341  {
342  // the message does not match; queue it and report no msg found
343  //dbgmsg << "But it's not what we want." << endl;
344  add_msg(msg, checknode, checktag);
345  msg = 0;
346  }
347  else
348  {
349  // the message matches; save the node and tag and return the msg
350  //dbgmsg << "And it is what we want!" << endl;
351  node = checknode;
352  tag = checktag;
353  //INCIPPLSTAT(incMessageReceived);
354  //INCIPPLSTAT(incMessageReceivedFromNetwork);
355  }
356  }
357  else
358  {
359  //INCIPPLSTAT(incMessageReceiveChecksFailed);
360  if (Ippl::retransmit())
361  msg = find_msg(node, tag);
362  }
363  }
364  }
365 
366 
367  // If we're on just one node, and we did not find a message, this is
368  // a big problem.
369  PInsist(!(myNode() == node && msg == 0),
370  "Local message not found in Communicate::receive_block!!");
371 
372  // set error code
373  ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
374 
375  // return the message, or NULL if none was found
376  return msg;
377 }
378 
379 
381 // Broadcast the given message to ALL nodes, including this node.
382 // Return number of nodes sent to.
383 // Arguments are the Message, and the tag for the message.
385 {
386  int i; // loop variable
387 
388  // send message to all other nodes
389  for (i=(getNodes() - 1); i >= 0; i--)
390  {
391  if (i != myNode())
392  {
393  mysend(msg, i, tag, COMM_SEND_TAG);
394  //INCIPPLSTAT(incMessageSent);
395  //INCIPPLSTAT(incMessageSentToOthers);
396  }
397  }
398 
399  // send message to this node; since we do this, don't need to delete msg
400  add_msg(msg, myNode(), tag);
401  //INCIPPLSTAT(incMessageSent);
402  //INCIPPLSTAT(incMessageSentToSelf);
403 
404  return getNodes();
405 }
406 
407 
409 // Broadcast the given message to all OTHER nodes, but not this node.
410 // Return number of nodes sent to.
411 // Arguments are the Message, and the tag for the message, and whether
412 // we should delete the given message object.
413 int Communicate::broadcast_others(Message *msg, int tag, bool delmsg)
414 {
415  int i; // loop variable
416 
417  // send message to all other nodes
418  for (i=(getNodes() - 1); i >= 0; i--)
419  {
420  if (i != myNode())
421  {
422  mysend(msg, i, tag, COMM_SEND_TAG);
423  //INCIPPLSTAT(incMessageSent);
424  //INCIPPLSTAT(incMessageSentToOthers);
425  }
426  }
427 
428  // delete message
429  if (delmsg)
430  delete msg;
431 
432  return getNodes() - 1;
433 }
434 
435 
437 // Synchronize all processors (everybody waits for everybody
438 // else to get here before returning to calling function).
440 {
441 
442 
443  mybarrier();
444  //INCIPPLSTAT(incBarriers);
445 }
446 
447 
449 // clean up after a Message has been used (called by Message). By
450 // default, does nothing.
452 
453 
455 // calculate how big the buffer must be to send the given message
457 {
458 
459  static const unsigned int longsize = wordround(sizeof(MsgNum_t));
460  static const unsigned int intsize4 = wordround(4 * sizeof(int));
461  static const unsigned int intsize2 = wordround(2 * sizeof(int));
462 
463  // the message contains a long and three integers at the start with the
464  // msg num, node, tag, and number of items
465  unsigned int buffsize = longsize + intsize4;
466 
467  // now include the sizes of the elements themselves. For each item,
468  // we also include two integers with size information.
469  int nitems = msg.size();
470  for (int i=0; i < nitems; ++i)
471  buffsize += (intsize2 + wordround(msg.item(i).numBytes()));
472 
473  // if checksums are to be performed, add in space for the 32-bit checksum
474  if (Ippl::useChecksums())
475  buffsize += sizeof(CRCTYPE);
476 
477  return buffsize;
478 }
479 
480 
482 // put data from the given Message into the given buffer
483 void Communicate::fill_msg_buffer(void *buffer, Message &msg, int tag,
484  int bufsize, int node)
485 {
486 
487  void *pos = buffer; // location in buffer to pack data
488  int nitems = msg.size(); // Number of items in Message
489  int mdata[4]; // Array to store msg header info
490  MsgNum_t mnum = (nextMsgNum++); // Message ID
491 
492  //Inform dbgmsg("***Communicate::fill_msg_buffer", INFORM_ALL_NODES);
493  //dbgmsg << "Preparing to send out message " << mnum;
494  //dbgmsg << " with tag " << tag << " of size " << bufsize << endl;
495 
496  // we must make sure to zero out the buffer if we're using checksums,
497  // so that random data values do not occur in the spaces where word
498  // alignment padding is used
499  if (Ippl::useChecksums())
500  memset(pos, 0, bufsize);
501 
502  // put message ID info into the buffer
503  pack(&mnum, pos, sizeof(MsgNum_t));
504 
505  // put message header info into the buffer
506  mdata[0] = tag;
507  mdata[1] = myNode();
508  mdata[2] = nitems;
509  mdata[3] = bufsize;
510  pack(mdata, pos, 4*sizeof(int));
511 
512  // finally pack in the data
513  for (int i=0; i < nitems; ++i)
514  {
515  Message::MsgItem &msgitem = msg.item(i);
516  mdata[0] = msgitem.numElems();
517  mdata[1] = msgitem.numBytes();
518  pack(mdata, pos, 2*sizeof(int));
519  if (mdata[1] > 0)
520  pack(msgitem.data(), pos, mdata[1]);
521  }
522 
523  // if checksums are on, find the checksum and append it to the buffer
524  if (Ippl::useChecksums())
525  {
526  // calculate the crc
527  int crcsize = bufsize - sizeof(CRCTYPE);
528  CRCTYPE crcval = crc(buffer, crcsize);
529 
530  // append it to the end of the buffer
531  *(static_cast<CRCTYPE *>(pos)) = crcval;
532 
533  // if we're trying to retransmit, cache the message
534  if (Ippl::retransmit())
535  {
536  if (tag != IPPL_RETRANSMIT_TAG && tag != IPPL_MSG_OK_TAG)
537  {
538  //dbgmsg << "Adding message " << mnum << " of size " << bufsize;
539  //dbgmsg << " with tag " << tag << " to sent cache." << endl;
540  add_to_send_cache(buffer, mnum, bufsize, node);
541  }
542  else
543  {
544  //dbgmsg << "NOT adding msg with tag " << tag << " to cache" << endl;
545  }
546  }
547  }
548 
549  ADDIPPLSTAT(incMessageBytesSent,bufsize);
550 }
551 
552 
554 // get data out of a buffer and create a Message
555 Message* Communicate::unpack_message(int &node, int &tag, void *buffer)
556 {
557 
558  Message *newmsg = 0;
559 
560  // pos will always point to the next location in the buffer to get data
561  void *pos = buffer;
562 
563  // get the message ID number
564  MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
565  addwordround(pos, sizeof(MsgNum_t));
566 
567  // get the tag, sender, and number of messages
568  int *mdata = static_cast<int *>(pos);
569  tag = mdata[0];
570  node = mdata[1];
571  int nitems = mdata[2];
572  int bufsize = mdata[3];
573  addwordround(pos, 4*sizeof(int));
574 
575  //WARNMSG("Received message " << mnum << " from node " << node);
576  //WARNMSG(" with tag " << tag << " of size " << bufsize << endl);
577 
578  // check for special tags, to abort, retransmit, or just receive
579  if (tag == IPPL_ABORT_TAG)
580  {
581  ERRORMSG("Stopping due to abort request sent from node " << node << endl);
582  ::abort();
583 
584  }
585  else if (tag == IPPL_EXIT_TAG)
586  {
587  ERRORMSG("Exiting due to exit request sent from node " << node << endl);
588  ::exit(1);
589 
590  }
591  else if (tag == IPPL_RETRANSMIT_TAG)
592  {
593  // get the retransmit message number and tag out of the current buffer
594  unpack_retransmission_request(nitems, pos);
595 
596  }
597  else if (tag == IPPL_MSG_OK_TAG)
598  {
599  // clear out the messages that this message lists are OK to be deleted
600  clear_ok_messages(nitems, pos);
601 
602  }
603  else
604  {
605  // this is just a regular message
606 
607  // do checksum comparison, if checksums are on
608  if (Ippl::useChecksums())
609  {
610  // calculate the crc
611  int crcsize = bufsize - sizeof(CRCTYPE);
612  CRCTYPE crcval = crc(buffer, crcsize);
613 
614  // as a test, randomly change crcval
615  //if (IpplRandom() < 0.1)
616  // crcval += 1;
617 
618  // compare this crc to the original one
619  void *origloc = static_cast<void *>(static_cast<char *>(buffer)+crcsize);
620  CRCTYPE origcrc = *(static_cast<CRCTYPE *>(origloc));
621  if (crcval != origcrc)
622  {
623  ERRORMSG("Failed CRC check (" << crcval << " != " << origcrc);
624  ERRORMSG(") on node " << Ippl::myNode());
625  ERRORMSG(" for message " << mnum << " of size " << bufsize);
626  ERRORMSG(" bytes sent from node ");
627  ERRORMSG(node << " with tag " << tag << endl);
628  if (Ippl::retransmit())
629  {
630  // send off a request to have message 'mnum' resent to us by 'node'
631  requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
632  }
633  else
634  {
635  // since we're not trying to retransmit, we just quit.
636  PInsist(crcval == origcrc, "Exiting due to CRC check failure.");
637  }
638 
639  // and then return 0 so that the caller knows there was a problem
640  return 0;
641  }
642  }
643 
644  // if we're here, the checksums (if enabled) were OK, so receive the
645  // message
646 
647  // create data structure for this message
648  newmsg = new Message(nitems);
649 
650  // get all the items and add to the message
651  for (int j = 0; j < nitems; j++)
652  {
653  int *hdr = static_cast<int *>(pos);
654  int elements = hdr[0];
655  int bytesize = hdr[1];
656  addwordround(pos, 2*sizeof(int));
657 
658  // for each item, find the pointer to the actual data and give
659  // that pointer to the Message object. The Message object then
660  // does not delete the data until the very end, when the Message
661  // is deleted.
662  if (bytesize > 0 && elements > 0)
663  {
664  newmsg->setCopy(false);
665  newmsg->setDelete(false);
666  newmsg->putmsg(pos, bytesize/elements, elements);
667  addwordround(pos, bytesize);
668  }
669  }
670 
671  // indicate we've received a normal message
672  ADDIPPLSTAT(incMessageBytesReceived,bufsize);
673 
674  // tell the sender that we received this message OK
675  if (Ippl::retransmit())
676  informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
677  }
678 
679  // return the new message, or zero to indicate the buffer contained
680  // something else than an actual message
681  return newmsg;
682 }
683 
684 
686 // put the given message buffer in the sent-message cache, as a new
687 // CommSendInfo object storing the buffer and other information.
688 void Communicate::add_to_send_cache(void *msgbuf, MsgNum_t mnum, int msgsize,
689  int node)
690 {
691 
692  // make sure we do not already have this message
693  SentCache_t::iterator senti = sentMsgCache.find(mnum);
694  if (senti != sentMsgCache.end())
695  {
696  ERRORMSG("ERROR: Trying to cache an already-cached message with num = ");
697  ERRORMSG(mnum << endl);
698  return;
699  }
700 
701  //Inform dbgmsg("***Communicate::add_to_send_cache", INFORM_ALL_NODES);
702  //dbgmsg << "Adding message " << mnum << " to cache with size = " << msgsize;
703  //dbgmsg << endl;
704 
705  // make a copy of the message
706  char *copybuf = new char[msgsize];
707  memcpy(copybuf, msgbuf, msgsize);
708 
709  // add the message to the cache list
710  CommSendInfo csi(msgsize, copybuf, node);
711  sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
712 
713  //dbgmsg<<"Sent message cached; now " << sentMsgCache.size() << " buffers ";
714  //dbgmsg << "in the cache." << endl;
715 }
716 
717 
719 // send off a request to have this message retransmitted to us
721 {
722  Inform dbgmsg("***Communicate::request_retransmission", INFORM_ALL_NODES);
723  dbgmsg << "Requesting retransmission of message " << mnum << " from node ";
724  dbgmsg << node << endl;
725 
726  // create a regular message, but with the
727  // special retransmit tag and the message number as the one item
728  // in the Message
729  Message msg(1);
730  msg.put(mnum);
731  send(&msg, node, IPPL_RETRANSMIT_TAG, false); // does not delete message
732 }
733 
734 
736 // get the resend information from a buffer sent in a message requesting
737 // retransmission
739 {
740  Inform dbgmsg("***Communicate::unpack_retrans_req", INFORM_ALL_NODES);
741  //dbgmsg << "Unpacking retransmission request ..." << endl;
742 
743  // retransmission messages have the following information as separate
744  // items:
745  // message number to retransmit (type == MsgNum_t)
746  // so, nitems should be one, and the bytesize should match
747  PInsist(nitems == 1, "Wrong number of items in retransmit request.");
748 
749  // get the retransmit message number item header info
750  int *hdr = static_cast<int *>(pos);
751  PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
752  "Wrong message info for retransmit message number.");
753  addwordround(pos, 2*sizeof(int));
754 
755  // get the actual retransmit message number
756  MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
757  dbgmsg << "Received request to resend message " << mnum << endl;
758  resendList.push_back(mnum);
759 }
760 
761 
763 // for message mnum, resend the data
765 {
766  // get the message info from our cache.
767  SentCache_t::iterator senti = sentMsgCache.find(mnum);
768  PInsist(senti != sentMsgCache.end(),
769  "Could not find message in local sent cache to retransmit.");
770 
771  // get the node and size
772  int size = (*senti).second.size();
773  int node = (*senti).second.node();
774 
775  // resend the data
776  ERRORMSG("WARNING: Resending message number " << mnum);
777  ERRORMSG(" of size " << size << " from node ");
778  ERRORMSG(myNode() << " to node " << node << " ..." << endl);
779  resend((*senti).second.buf(), size, node, COMM_SEND_TAG);
780 }
781 
782 
784 // tell the sender that we received this message OK
786 {
787  Inform dbgmsg("***Communicate::send_ok_message", INFORM_ALL_NODES);
788  dbgmsg << "Informing node " << node << " that message " << mnum;
789  dbgmsg << " was received ok." << endl;
790 
791  Message msg(1);
792  msg.put(mnum); // the list of message numbers, one at a time
793 
794  send(&msg, node, IPPL_MSG_OK_TAG, false); // does not delete message
795 }
796 
797 
799 // unpack message with a list of OK message numbers, and delete them
800 // from our cache
801 void Communicate::clear_ok_messages(int nitems, void *pos)
802 {
803  Inform dbgmsg("***Communicate::clear_ok_messages", INFORM_ALL_NODES);
804  //dbgmsg << "Unpacking messages-ok information for " << nitems;
805  //dbgmsg << " messages ..." << endl;
806 
807  // message-ok messages have the following information as separate
808  // items:
809  // the number of OK messages (type == int)
810  // the first OK message number (type == MsgNum_t)
811  // the second OK message number (type == MsgNum_t)
812  // etc
813  PInsist(nitems >= 1, "Wrong number of items in retransmit request.");
814 
815  // loop through the list of items, get the message number from each,
816  // and remove that message from our queue
817  for (int i=0; i < nitems; ++i)
818  {
819  // get the message-ok header
820  int *hdr = static_cast<int *>(pos);
821  PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
822  "Wrong message info for message-ok number.");
823  addwordround(pos, 2*sizeof(int));
824 
825  // get the message-ok number
826  MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
827  addwordround(pos, sizeof(MsgNum_t));
828 
829  // add this number to our list of messages to say are OK
830  dbgmsg << "Will clear message " << mnum << " as OK." << endl;
831  sentOKList.push_back(mnum);
832  }
833 }
834 
835 
837 // unpack message with a list of OK message numbers, and delete them
838 // from our cache
840 {
841  Inform dbgmsg("***Communicate::remove_single_ok_message", INFORM_ALL_NODES);
842 
843  // check if we have that message
844  SentCache_t::iterator senti = sentMsgCache.find(mnum);
845  if (senti == sentMsgCache.end())
846  {
847  // we do not have it; print an error message
848  ERRORMSG("ERROR: Received 'message ok' for message " << mnum);
849  ERRORMSG(", but this node does not have that message in cache." << endl);
850 
851  }
852  else
853  {
854  // we have it, so remove it after freeing the buffer
855  (*senti).second.freebuf();
856  sentMsgCache.erase(senti);
857  dbgmsg << "Removed message " << mnum << " from send cache; now ";
858  dbgmsg << sentMsgCache.size() << " messages in list." << endl;
859  }
860 }
861 
862 
864 // process list of resend requests
866 {
867  if (resendList.size() > 0)
868  {
869  Inform dbgmsg("***Communicate::process_resend_reqs", INFORM_ALL_NODES);
870  dbgmsg << "Clearing " << sentOKList.size() << " and resending ";
871  dbgmsg << resendList.size() << " messages ..." << endl;
872  }
873 
874  // clear out OK messages
875  while (sentOKList.size() > 0)
876  {
877  MsgNum_t mnum = *(sentOKList.begin());
878  sentOKList.erase(sentOKList.begin());
880  }
881 
882  // resend a message, if necessary
883  while (resendList.size() > 0)
884  {
885  MsgNum_t mnum = *(resendList.begin());
886  resendList.erase(resendList.begin());
887  perform_resend(mnum);
888  }
889 
890  // inform other nodes that we've received their messages ok
891  while (informOKList.size() > 0)
892  {
893  int node = (*(informOKList.begin())).first;
894  MsgNum_t mnum = (*(informOKList.begin())).second;
895  informOKList.erase(informOKList.begin());
896  send_ok_message(node, mnum);
897  }
898 
899  // request resends from other nodes
900  while (requestList.size() > 0)
901  {
902  int node = (*(requestList.begin())).first;
903  MsgNum_t mnum = (*(requestList.begin())).second;
904  requestList.erase(requestList.begin());
905  request_retransmission(node, mnum);
906  }
907 }
elements
Definition: IndexMap.cpp:163
T * value_type(const SliceIterator< T > &)
std::ostream & operator<<(std::ostream &o, const Communicate &c)
Definition: Communicate.cpp:47
CRCTYPE crc(void *icp, int icnt)
Definition: CRC.cpp:106
unsigned int CRCTYPE
Definition: CRC.h:25
#define IPPL_EXIT_TAG
Definition: Tags.h:26
#define IPPL_RETRANSMIT_TAG
Definition: Tags.h:27
#define IPPL_MSG_OK_TAG
Definition: Tags.h:28
#define IPPL_ABORT_TAG
Definition: Tags.h:25
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define INFORM_ALL_NODES
Definition: Inform.h:39
#define ERRORMSG(msg)
Definition: IpplInfo.h:350
#define ADDIPPLSTAT(stat, amount)
Definition: IpplStats.h:237
#define PInsist(c, m)
Definition: PAssert.h:120
constexpr double c
The velocity of light in m/s.
Definition: Physics.h:51
std::string::iterator iterator
Definition: MSLang.h:16
virtual bool mysend(Message *, int node, int utag, int etag)
std::vector< int > Contexts
Definition: Communicate.h:264
void request_retransmission(int node, MsgNum_t mnum)
virtual Message * myreceive(int &node, int &tag, int etag)
void perform_resend(MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)
std::vector< MessageData > recMsgList
Definition: Communicate.h:258
static void addwordround(void *&pos, int size)
Definition: Communicate.h:358
Message * receive(int &node, int &tag)
bool send(Message *, int node, int tag, bool delmsg=true)
Message * find_msg(int &, int &)
void send_ok_message(int node, MsgNum_t mnum)
std::vector< MsgNum_t > resendList
Definition: Communicate.h:279
std::vector< std::pair< int, MsgNum_t > > informOKList
Definition: Communicate.h:285
int find_msg_length(Message &)
virtual void cleanupMessage(void *)
void unpack_retransmission_request(int nitems, void *pos)
virtual int broadcast_others(Message *, int, bool delmsg=true)
std::vector< std::pair< int, MsgNum_t > > requestList
Definition: Communicate.h:288
static unsigned int wordround(int size)
Definition: Communicate.h:350
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
Definition: Communicate.cpp:69
MsgNum_t nextMsgNum
Definition: Communicate.h:270
virtual bool resend(void *buf, int size, int node, int etag)
Message * receive_block(int &node, int &tag)
void barrier(void)
virtual void mybarrier(void)
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
int getNodes() const
Definition: Communicate.h:143
virtual ~Communicate(void)
Definition: Communicate.cpp:84
virtual int broadcast_all(Message *, int)
std::vector< MsgNum_t > sentOKList
Definition: Communicate.h:282
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:97
static void pack(void *packdata, void *&pos, int size)
Definition: Communicate.h:364
void clear_ok_messages(int nitems, void *pos)
Message * unpack_message(int &node, int &tag, void *pos)
void process_resend_requests()
int myNode() const
Definition: Communicate.h:155
SentCache_t sentMsgCache
Definition: Communicate.h:276
void fill_msg_buffer(void *, Message &, int, int, int)
Message & setDelete(const bool c)
Definition: Message.h:331
size_t size() const
Definition: Message.h:292
Message & put(const T &val)
Definition: Message.h:406
Message & putmsg(void *, int, int=0)
Message & setCopy(const bool c)
Definition: Message.h:319
MsgItem & item(size_t n)
Definition: Message.h:308
unsigned int numBytes() const
Definition: Message.h:218
void * data()
Definition: Message.h:244
unsigned int numElems() const
Definition: Message.h:222
Definition: Inform.h:42
static bool retransmit()
Definition: IpplInfo.h:195
static bool useChecksums()
Definition: IpplInfo.h:192
static int myNode()
Definition: IpplInfo.cpp:691