OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
Communicate.cpp
Go to the documentation of this file.
1 // -*- C++ -*-
2 /***************************************************************************
3  *
4  * The IPPL Framework
5  *
6  * This program was prepared by PSI.
7  * All rights in the program are reserved by PSI.
8  * Neither PSI nor the author(s)
9  * makes any warranty, express or implied, or assumes any liability or
10  * responsibility for the use of this software
11  *
12  * Visit www.amas.web.psi for more details
13  *
14  ***************************************************************************/
15 
16 // -*- C++ -*-
17 /***************************************************************************
18  *
19  * The IPPL Framework
20  *
21  *
22  * Visit http://people.web.psi.ch/adelmann/ for more details
23  *
24  ***************************************************************************/
25 
27 // Communicate - common member functions for Communicate object.
28 // On-node traffic
29 // is handle here and architecture specific routines are called for off-node
30 // traffic. This is the base class for all comm-lib-specific subclasses.
32 
33 // include files
34 #include "Message/Communicate.h"
35 #include "Message/Message.h"
36 #include "Message/CRC.h"
37 #include "PETE/IpplExpressions.h"
38 
39 #include "Utility/IpplInfo.h"
40 #include "Utility/IpplStats.h"
42 #include "Utility/PAssert.h"
43 #include <cstdio>
44 
45 
47 // print summary of this class to the given output stream
48 std::ostream& operator<<(std::ostream& o, const Communicate& c)
49 {
50 
51  o << "Parallel communication method: " << c.name() << "\n";
52  o << " Total nodes: " << c.getNodes() << ", Current node: ";
53  o << c.myNode() << "\n";
54  o << " Queued received messages: ";
55  o << c.getReceived() <<"\n";
56 
57  return o;
58 }
59 
60 
62 // Constructor.
63 // arguments: command-line args, and number of processes
64 // to start (if < 0, start the 'default' number, i.e. the number of
65 // hosts
66 // Note: The base-class constructor does not need the argument info or
67 // the number of nodes, it just by default sets the number of nodes=1
68 // Also note: the derived classes should erase Contexts and Processes, and
69 // put in the proper values.
70 Communicate::Communicate(int, char **, int)
71  : nextMsgNum(1)
72 {
73 
74  // initialize data for Communicate
75  TotalNodes = 1;
76  myHost = 0;
78  Contexts.push_back(1);
79  Processes.push_back(Contexts); // using Contexts is just convenient here
80 }
81 
82 
84 // Destructor. Nothing to do at present.
86 {
87 
88 
89  // delete the cached messages
90  SentCache_t::iterator cachei = sentMsgCache.begin();
91  for ( ; cachei != sentMsgCache.end(); ++cachei)
92  (*cachei).second.freebuf();
93 }
94 
95 
97 // Add a new on-node message to the linked list. Return success.
98 bool Communicate::add_msg(Message *msg, int node, int tag)
99 {
100  recMsgList.push_back(MessageData(node, tag, msg));
101  return true;
102 }
103 
104 
106 // Looks for a message in the message queue from the specified node
107 // and tag. This understands wildcards for node and tag.
108 // Returns a pointer to the Message object found, and sets node and
109 // tag equal to the proper values. Also, this will remove the item from
110 // the queue.
111 Message* Communicate::find_msg(int& node, int& tag)
112 {
113 
114  // just find the first message that meets the criteria
117  for ( ; qi != qend ; ++qi)
118  {
119  if ((node == COMM_ANY_NODE || (*qi).node == node) &&
120  (tag == COMM_ANY_TAG || (*qi).tag == tag))
121  {
122  node = (*qi).node;
123  tag = (*qi).tag;
124  Message *retval = (*qi).msg;
125  recMsgList.erase(qi);
126  //INCIPPLSTAT(incMessageReceived);
127  //INCIPPLSTAT(incMessageReceivedFromQueue);
128  return retval;
129  }
130  }
131 
132  // if we're here, no message was found
133  return 0;
134 }
135 
136 
138 // Default version of virtual send function ... here, does nothing.
139 bool Communicate::mysend(Message *, int, int, int)
140 {
141 
142  // just return false, since we cannot send a message with this function
143  return false;
144 }
145 
146 
148 // Default version of virtual receive function ... here, does nothing.
150 {
151 
152  // just return NULL, since we cannot find a message with this function
153  return 0;
154 }
155 
156 
158 // Default version of virtual barrier function ... here, does nothing.
160 {
161 
162 
163  // just return NULL, since we cannot find a message with this function
164  return;
165 }
166 
167 
169 // resent a message buffer that has been previously packed and copied
170 // into the provided buffer. Return success.
171 bool Communicate::resend(void *, int, int, int)
172 {
173 
174  // just return false, since we cannot resend a message with this function
175  return false;
176 }
177 
178 
180 // Send data to the given node, with given tag. If delmsg==true, the
181 // message will be deleted after it is sent, otherwise it will be left alone.
182 bool Communicate::send(Message *msg, int node, int tag, bool delmsg)
183 {
184  bool retval;
185 
186  // process list of resend requests
187  //process_resend_requests();
188 
189  // check for problems ...
190  if ( node < 0 || node >= getNodes() || tag < 0 )
191  {
192  ERRORMSG("Communicate: illegal send node " << node << endl);
194  return false;
195  }
196 
197  // if the message is addressed to this node, put it in the local receive
198  // queue immediately
199  if ( node == myNode() )
200  {
201  retval = add_msg(msg, node, tag);
202  //INCIPPLSTAT(incMessageSent);
203  //INCIPPLSTAT(incMessageSentToSelf);
204  }
205  else
206  {
207  // the message must be sent elsewhere ... call the proper function
208  retval = mysend(msg, node, tag, COMM_SEND_TAG);
209 
210  // if the send was successful, delete the message if requested
211  if (retval)
212  {
213  //INCIPPLSTAT(incMessageSent);
214  //INCIPPLSTAT(incMessageSentToOthers);
215  if (delmsg)
216  delete msg;
217  }
218  }
219 
220  // set error code
221  ErrorStatus = (retval != 0 ? COMM_NOERROR : COMM_NOSEND);
222 
223  // return the success of the operation
224  return retval;
225 }
226 
227 
229 // Receive data from another node. Returns newly created Message object
230 // with received message, or NULL if no message is available.
231 // If node == COMM_ANY_NODE, this will receive the next message with the given
232 // tag from any node.
233 // If tag == COMM_ANY_TAG, this will receive the next message with
234 // any tag from the given node. If both are wildcards, this will receive the
235 // next message, period. node and tag are passed by reference; if either
236 // is a wildcard, and a message is received, they are changed to their actual
237 // values.
238 // Messages are searched for in this order (if node == COMM_ANY_NODE) :
239 // 1. Pending in network
240 // 2. In receive queue
241 Message* Communicate::receive(int& node, int& tag)
242 {
243 
244 
245  //Inform dbgmsg("Comm::receive", INFORM_ALL_NODES);
246  //dbgmsg << "Doing receive from node " << node << ", tag " << tag << endl;
247 
248  // process list of resend requests
250 
251  // do a check for a message from another node
252  //dbgmsg << "Checking for queued message ..." << endl;
253  Message *msg = find_msg(node, tag);
254  //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
255 
256  if (msg == 0 && myNode() != node)
257  {
258  int checknode = node;
259  int checktag = tag;
260  //INCIPPLSTAT(incMessageReceiveChecks);
261  //dbgmsg << "Checking for remote message ..." << endl;
262  if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
263  {
264  // see if the message matches our criteria for searching
265  //dbgmsg << "Message found from node " << checknode << " with tag ";
266  //dbgmsg << checktag << endl;
267  if ((node != COMM_ANY_NODE && node != checknode) ||
268  (tag != COMM_ANY_TAG && tag != checktag ))
269  {
270  // the message does not match; queue it and report no msg found
271  //dbgmsg << "But it's not what we want." << endl;
272  add_msg(msg, checknode, checktag);
273  msg = 0;
274  }
275  else
276  {
277  // the message matches; save the node and tag and return the msg
278  //dbgmsg << "And it is what we want!" << endl;
279  node = checknode;
280  tag = checktag;
281  //INCIPPLSTAT(incMessageReceived);
282  //INCIPPLSTAT(incMessageReceivedFromNetwork);
283  }
284  }
285  else
286  {
287  //INCIPPLSTAT(incMessageReceiveChecksFailed);
288  }
289  }
290 
291  // set error code
292  ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
293 
294  // return the message, or NULL if none was found
295  return msg;
296 }
297 
298 
300 // A blocking version of receive.
301 Message *Communicate::receive_block(int& node, int &tag)
302 {
303 
304 
305 
306 
307 
308  // process list of resend requests
310 
311  //Inform dbgmsg("Comm::receive_block", INFORM_ALL_NODES);
312  //dbgmsg << "Doing blocking receive from node " << node;
313  //dbgmsg << ", tag " << tag << endl;
314 
315  // If we haven't already found a message, check the local messages
316  //dbgmsg << "Checking for queued message ..." << endl;
317 
318  Message *msg = find_msg(node, tag);
319 
320  //dbgmsg << "Found one? " << (msg != 0 ? "yes" : "no") << endl;
321 
322  // keep checking for remote msgs until we get one
323 
324  if (myNode() != node)
325  {
326  while (msg == 0)
327  {
328  // process list of resend requests
330 
331  int checknode = node;
332  int checktag = tag;
333  //INCIPPLSTAT(incMessageReceiveChecks);
334  //dbgmsg << "Checking for remote message ..." << endl;
335  if ((msg = myreceive(checknode, checktag, COMM_SEND_TAG)) != 0)
336  {
337  // see if the message matches our criteria for searching
338  //dbgmsg << "Message found from node " << checknode << " with tag ";
339  //dbgmsg << checktag << endl;
340  if ((node != COMM_ANY_NODE && node != checknode) ||
341  (tag != COMM_ANY_TAG && tag != checktag ))
342  {
343  // the message does not match; queue it and report no msg found
344  //dbgmsg << "But it's not what we want." << endl;
345  add_msg(msg, checknode, checktag);
346  msg = 0;
347  }
348  else
349  {
350  // the message matches; save the node and tag and return the msg
351  //dbgmsg << "And it is what we want!" << endl;
352  node = checknode;
353  tag = checktag;
354  //INCIPPLSTAT(incMessageReceived);
355  //INCIPPLSTAT(incMessageReceivedFromNetwork);
356  }
357  }
358  else
359  {
360  //INCIPPLSTAT(incMessageReceiveChecksFailed);
361  if (Ippl::retransmit())
362  msg = find_msg(node, tag);
363  }
364  }
365  }
366 
367 
368  // If we're on just one node, and we did not find a message, this is
369  // a big problem.
370  PInsist(!(myNode() == node && msg == 0),
371  "Local message not found in Communicate::receive_block!!");
372 
373  // set error code
374  ErrorStatus = (msg != 0 ? COMM_NOERROR : COMM_NORECEIVE);
375 
376  // return the message, or NULL if none was found
377  return msg;
378 }
379 
380 
382 // Broadcast the given message to ALL nodes, including this node.
383 // Return number of nodes sent to.
384 // Arguments are the Message, and the tag for the message.
386 {
387  int i; // loop variable
388 
389  // send message to all other nodes
390  for (i=(getNodes() - 1); i >= 0; i--)
391  {
392  if (i != myNode())
393  {
394  mysend(msg, i, tag, COMM_SEND_TAG);
395  //INCIPPLSTAT(incMessageSent);
396  //INCIPPLSTAT(incMessageSentToOthers);
397  }
398  }
399 
400  // send message to this node; since we do this, don't need to delete msg
401  add_msg(msg, myNode(), tag);
402  //INCIPPLSTAT(incMessageSent);
403  //INCIPPLSTAT(incMessageSentToSelf);
404 
405  return getNodes();
406 }
407 
408 
410 // Broadcast the given message to all OTHER nodes, but not this node.
411 // Return number of nodes sent to.
412 // Arguments are the Message, and the tag for the message, and whether
413 // we should delete the given message object.
414 int Communicate::broadcast_others(Message *msg, int tag, bool delmsg)
415 {
416  int i; // loop variable
417 
418  // send message to all other nodes
419  for (i=(getNodes() - 1); i >= 0; i--)
420  {
421  if (i != myNode())
422  {
423  mysend(msg, i, tag, COMM_SEND_TAG);
424  //INCIPPLSTAT(incMessageSent);
425  //INCIPPLSTAT(incMessageSentToOthers);
426  }
427  }
428 
429  // delete message
430  if (delmsg)
431  delete msg;
432 
433  return getNodes() - 1;
434 }
435 
436 
438 // Synchronize all processors (everybody waits for everybody
439 // else to get here before returning to calling function).
441 {
442 
443 
444  mybarrier();
445  //INCIPPLSTAT(incBarriers);
446 }
447 
448 
450 // clean up after a Message has been used (called by Message). By
451 // default, does nothing.
453 
454 
456 // calculate how big the buffer must be to send the given message
458 {
459 
460  static const unsigned int longsize = wordround(sizeof(MsgNum_t));
461  static const unsigned int intsize4 = wordround(4 * sizeof(int));
462  static const unsigned int intsize2 = wordround(2 * sizeof(int));
463 
464  // the message contains a long and three integers at the start with the
465  // msg num, node, tag, and number of items
466  unsigned int buffsize = longsize + intsize4;
467 
468  // now include the sizes of the elements themselves. For each item,
469  // we also include two integers with size information.
470  int nitems = msg.size();
471  for (int i=0; i < nitems; ++i)
472  buffsize += (intsize2 + wordround(msg.item(i).numBytes()));
473 
474  // if checksums are to be performed, add in space for the 32-bit checksum
475  if (Ippl::useChecksums())
476  buffsize += sizeof(CRCTYPE);
477 
478  return buffsize;
479 }
480 
481 
483 // put data from the given Message into the given buffer
484 void Communicate::fill_msg_buffer(void *buffer, Message &msg, int tag,
485  int bufsize, int node)
486 {
487 
488  void *pos = buffer; // location in buffer to pack data
489  int nitems = msg.size(); // Number of items in Message
490  int mdata[4]; // Array to store msg header info
491  MsgNum_t mnum = (nextMsgNum++); // Message ID
492 
493  //Inform dbgmsg("***Communicate::fill_msg_buffer", INFORM_ALL_NODES);
494  //dbgmsg << "Preparing to send out message " << mnum;
495  //dbgmsg << " with tag " << tag << " of size " << bufsize << endl;
496 
497  // we must make sure to zero out the buffer if we're using checksums,
498  // so that random data values do not occur in the spaces where word
499  // alignment padding is used
500  if (Ippl::useChecksums())
501  memset(pos, 0, bufsize);
502 
503  // put message ID info into the buffer
504  pack(&mnum, pos, sizeof(MsgNum_t));
505 
506  // put message header info into the buffer
507  mdata[0] = tag;
508  mdata[1] = myNode();
509  mdata[2] = nitems;
510  mdata[3] = bufsize;
511  pack(mdata, pos, 4*sizeof(int));
512 
513  // finally pack in the data
514  for (int i=0; i < nitems; ++i)
515  {
516  Message::MsgItem &msgitem = msg.item(i);
517  mdata[0] = msgitem.numElems();
518  mdata[1] = msgitem.numBytes();
519  pack(mdata, pos, 2*sizeof(int));
520  if (mdata[1] > 0)
521  pack(msgitem.data(), pos, mdata[1]);
522  }
523 
524  // if checksums are on, find the checksum and append it to the buffer
525  if (Ippl::useChecksums())
526  {
527  // calculate the crc
528  int crcsize = bufsize - sizeof(CRCTYPE);
529  CRCTYPE crcval = crc(buffer, crcsize);
530 
531  // append it to the end of the buffer
532  *(static_cast<CRCTYPE *>(pos)) = crcval;
533 
534  // if we're trying to retransmit, cache the message
535  if (Ippl::retransmit())
536  {
537  if (tag != IPPL_RETRANSMIT_TAG && tag != IPPL_MSG_OK_TAG)
538  {
539  //dbgmsg << "Adding message " << mnum << " of size " << bufsize;
540  //dbgmsg << " with tag " << tag << " to sent cache." << endl;
541  add_to_send_cache(buffer, mnum, bufsize, node);
542  }
543  else
544  {
545  //dbgmsg << "NOT adding msg with tag " << tag << " to cache" << endl;
546  }
547  }
548  }
549 
550  ADDIPPLSTAT(incMessageBytesSent,bufsize);
551 }
552 
553 
555 // get data out of a buffer and create a Message
556 Message* Communicate::unpack_message(int &node, int &tag, void *buffer)
557 {
558 
559  Message *newmsg = 0;
560 
561  // pos will always point to the next location in the buffer to get data
562  void *pos = buffer;
563 
564  // get the message ID number
565  MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
566  addwordround(pos, sizeof(MsgNum_t));
567 
568  // get the tag, sender, and number of messages
569  int *mdata = static_cast<int *>(pos);
570  tag = mdata[0];
571  node = mdata[1];
572  int nitems = mdata[2];
573  int bufsize = mdata[3];
574  addwordround(pos, 4*sizeof(int));
575 
576  //WARNMSG("Received message " << mnum << " from node " << node);
577  //WARNMSG(" with tag " << tag << " of size " << bufsize << endl);
578 
579  // check for special tags, to abort, retransmit, or just receive
580  if (tag == IPPL_ABORT_TAG)
581  {
582  ERRORMSG("Stopping due to abort request sent from node " << node << endl);
583  ::abort();
584 
585  }
586  else if (tag == IPPL_EXIT_TAG)
587  {
588  ERRORMSG("Exiting due to exit request sent from node " << node << endl);
589  ::exit(1);
590 
591  }
592  else if (tag == IPPL_RETRANSMIT_TAG)
593  {
594  // get the retransmit message number and tag out of the current buffer
595  unpack_retransmission_request(nitems, pos);
596 
597  }
598  else if (tag == IPPL_MSG_OK_TAG)
599  {
600  // clear out the messages that this message lists are OK to be deleted
601  clear_ok_messages(nitems, pos);
602 
603  }
604  else
605  {
606  // this is just a regular message
607 
608  // do checksum comparison, if checksums are on
609  if (Ippl::useChecksums())
610  {
611  // calculate the crc
612  int crcsize = bufsize - sizeof(CRCTYPE);
613  CRCTYPE crcval = crc(buffer, crcsize);
614 
615  // as a test, randomly change crcval
616  //if (IpplRandom() < 0.1)
617  // crcval += 1;
618 
619  // compare this crc to the original one
620  void *origloc = static_cast<void *>(static_cast<char *>(buffer)+crcsize);
621  CRCTYPE origcrc = *(static_cast<CRCTYPE *>(origloc));
622  if (crcval != origcrc)
623  {
624  ERRORMSG("Failed CRC check (" << crcval << " != " << origcrc);
625  ERRORMSG(") on node " << Ippl::myNode());
626  ERRORMSG(" for message " << mnum << " of size " << bufsize);
627  ERRORMSG(" bytes sent from node ");
628  ERRORMSG(node << " with tag " << tag << endl);
629  if (Ippl::retransmit())
630  {
631  // send off a request to have message 'mnum' resent to us by 'node'
632  requestList.push_back(std::pair<int,MsgNum_t>(node, mnum));
633  }
634  else
635  {
636  // since we're not trying to retransmit, we just quit.
637  PInsist(crcval == origcrc, "Exiting due to CRC check failure.");
638  }
639 
640  // and then return 0 so that the caller knows there was a problem
641  return 0;
642  }
643  }
644 
645  // if we're here, the checksums (if enabled) were OK, so receive the
646  // message
647 
648  // create data structure for this message
649  newmsg = new Message(nitems);
650 
651  // get all the items and add to the message
652  for (int j = 0; j < nitems; j++)
653  {
654  int *hdr = static_cast<int *>(pos);
655  int elements = hdr[0];
656  int bytesize = hdr[1];
657  addwordround(pos, 2*sizeof(int));
658 
659  // for each item, find the pointer to the actual data and give
660  // that pointer to the Message object. The Message object then
661  // does not delete the data until the very end, when the Message
662  // is deleted.
663  if (bytesize > 0 && elements > 0)
664  {
665  newmsg->setCopy(false);
666  newmsg->setDelete(false);
667  newmsg->putmsg(pos, bytesize/elements, elements);
668  addwordround(pos, bytesize);
669  }
670  }
671 
672  // indicate we've received a normal message
673  ADDIPPLSTAT(incMessageBytesReceived,bufsize);
674 
675  // tell the sender that we received this message OK
676  if (Ippl::retransmit())
677  informOKList.push_back(std::pair<int,MsgNum_t>(node, mnum));
678  }
679 
680  // return the new message, or zero to indicate the buffer contained
681  // something else than an actual message
682  return newmsg;
683 }
684 
685 
687 // put the given message buffer in the sent-message cache, as a new
688 // CommSendInfo object storing the buffer and other information.
689 void Communicate::add_to_send_cache(void *msgbuf, MsgNum_t mnum, int msgsize,
690  int node)
691 {
692 
693  // make sure we do not already have this message
694  SentCache_t::iterator senti = sentMsgCache.find(mnum);
695  if (senti != sentMsgCache.end())
696  {
697  ERRORMSG("ERROR: Trying to cache an already-cached message with num = ");
698  ERRORMSG(mnum << endl);
699  return;
700  }
701 
702  //Inform dbgmsg("***Communicate::add_to_send_cache", INFORM_ALL_NODES);
703  //dbgmsg << "Adding message " << mnum << " to cache with size = " << msgsize;
704  //dbgmsg << endl;
705 
706  // make a copy of the message
707  char *copybuf = new char[msgsize];
708  memcpy(copybuf, msgbuf, msgsize);
709 
710  // add the message to the cache list
711  CommSendInfo csi(msgsize, copybuf, node);
712  sentMsgCache.insert(SentCache_t::value_type(mnum, csi));
713 
714  //dbgmsg<<"Sent message cached; now " << sentMsgCache.size() << " buffers ";
715  //dbgmsg << "in the cache." << endl;
716 }
717 
718 
720 // send off a request to have this message retransmitted to us
722 {
723  Inform dbgmsg("***Communicate::request_retransmission", INFORM_ALL_NODES);
724  dbgmsg << "Requesting retransmission of message " << mnum << " from node ";
725  dbgmsg << node << endl;
726 
727  // create a regular message, but with the
728  // special retransmit tag and the message number as the one item
729  // in the Message
730  Message msg(1);
731  msg.put(mnum);
732  send(&msg, node, IPPL_RETRANSMIT_TAG, false); // does not delete message
733 }
734 
735 
737 // get the resend information from a buffer sent in a message requesting
738 // retransmission
740 {
741  Inform dbgmsg("***Communicate::unpack_retrans_req", INFORM_ALL_NODES);
742  //dbgmsg << "Unpacking retransmission request ..." << endl;
743 
744  // retransmission messages have the following information as separate
745  // items:
746  // message number to retransmit (type == MsgNum_t)
747  // so, nitems should be one, and the bytesize should match
748  PInsist(nitems == 1, "Wrong number of items in retransmit request.");
749 
750  // get the retransmit message number item header info
751  int *hdr = static_cast<int *>(pos);
752  PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
753  "Wrong message info for retransmit message number.");
754  addwordround(pos, 2*sizeof(int));
755 
756  // get the actual retransmit message number
757  MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
758  dbgmsg << "Received request to resend message " << mnum << endl;
759  resendList.push_back(mnum);
760 }
761 
762 
764 // for message mnum, resend the data
766 {
767  // get the message info from our cache.
768  SentCache_t::iterator senti = sentMsgCache.find(mnum);
769  PInsist(senti != sentMsgCache.end(),
770  "Could not find message in local sent cache to retransmit.");
771 
772  // get the node and size
773  int size = (*senti).second.size();
774  int node = (*senti).second.node();
775 
776  // resend the data
777  ERRORMSG("WARNING: Resending message number " << mnum);
778  ERRORMSG(" of size " << size << " from node ");
779  ERRORMSG(myNode() << " to node " << node << " ..." << endl);
780  resend((*senti).second.buf(), size, node, COMM_SEND_TAG);
781 }
782 
783 
785 // tell the sender that we received this message OK
787 {
788  Inform dbgmsg("***Communicate::send_ok_message", INFORM_ALL_NODES);
789  dbgmsg << "Informing node " << node << " that message " << mnum;
790  dbgmsg << " was received ok." << endl;
791 
792  Message msg(1);
793  msg.put(mnum); // the list of message numbers, one at a time
794 
795  send(&msg, node, IPPL_MSG_OK_TAG, false); // does not delete message
796 }
797 
798 
800 // unpack message with a list of OK message numbers, and delete them
801 // from our cache
802 void Communicate::clear_ok_messages(int nitems, void *pos)
803 {
804  Inform dbgmsg("***Communicate::clear_ok_messages", INFORM_ALL_NODES);
805  //dbgmsg << "Unpacking messages-ok information for " << nitems;
806  //dbgmsg << " messages ..." << endl;
807 
808  // message-ok messages have the following information as separate
809  // items:
810  // the number of OK messages (type == int)
811  // the first OK message number (type == MsgNum_t)
812  // the second OK message number (type == MsgNum_t)
813  // etc
814  PInsist(nitems >= 1, "Wrong number of items in retransmit request.");
815 
816  // loop through the list of items, get the message number from each,
817  // and remove that message from our queue
818  for (int i=0; i < nitems; ++i)
819  {
820  // get the message-ok header
821  int *hdr = static_cast<int *>(pos);
822  PInsist(hdr[0] == 1 && hdr[1] == sizeof(MsgNum_t),
823  "Wrong message info for message-ok number.");
824  addwordround(pos, 2*sizeof(int));
825 
826  // get the message-ok number
827  MsgNum_t mnum = *(static_cast<MsgNum_t *>(pos));
828  addwordround(pos, sizeof(MsgNum_t));
829 
830  // add this number to our list of messages to say are OK
831  dbgmsg << "Will clear message " << mnum << " as OK." << endl;
832  sentOKList.push_back(mnum);
833  }
834 }
835 
836 
838 // unpack message with a list of OK message numbers, and delete them
839 // from our cache
841 {
842  Inform dbgmsg("***Communicate::remove_single_ok_message", INFORM_ALL_NODES);
843 
844  // check if we have that message
845  SentCache_t::iterator senti = sentMsgCache.find(mnum);
846  if (senti == sentMsgCache.end())
847  {
848  // we do not have it; print an error message
849  ERRORMSG("ERROR: Received 'message ok' for message " << mnum);
850  ERRORMSG(", but this node does not have that message in cache." << endl);
851 
852  }
853  else
854  {
855  // we have it, so remove it after freeing the buffer
856  (*senti).second.freebuf();
857  sentMsgCache.erase(senti);
858  dbgmsg << "Removed message " << mnum << " from send cache; now ";
859  dbgmsg << sentMsgCache.size() << " messages in list." << endl;
860  }
861 }
862 
863 
865 // process list of resend requests
867 {
868  if (resendList.size() > 0)
869  {
870  Inform dbgmsg("***Communicate::process_resend_reqs", INFORM_ALL_NODES);
871  dbgmsg << "Clearing " << sentOKList.size() << " and resending ";
872  dbgmsg << resendList.size() << " messages ..." << endl;
873  }
874 
875  // clear out OK messages
876  while (sentOKList.size() > 0)
877  {
878  MsgNum_t mnum = *(sentOKList.begin());
879  sentOKList.erase(sentOKList.begin());
881  }
882 
883  // resend a message, if necessary
884  while (resendList.size() > 0)
885  {
886  MsgNum_t mnum = *(resendList.begin());
887  resendList.erase(resendList.begin());
888  perform_resend(mnum);
889  }
890 
891  // inform other nodes that we've received their messages ok
892  while (informOKList.size() > 0)
893  {
894  int node = (*(informOKList.begin())).first;
895  MsgNum_t mnum = (*(informOKList.begin())).second;
896  informOKList.erase(informOKList.begin());
897  send_ok_message(node, mnum);
898  }
899 
900  // request resends from other nodes
901  while (requestList.size() > 0)
902  {
903  int node = (*(requestList.begin())).first;
904  MsgNum_t mnum = (*(requestList.begin())).second;
905  requestList.erase(requestList.begin());
906  request_retransmission(node, mnum);
907  }
908 }
void clear_ok_messages(int nitems, void *pos)
std::ostream & operator<<(std::ostream &os, const Attribute &attr)
Definition: Attribute.cpp:167
elements
Definition: IndexMap.cpp:141
Communicate(int argc=0, char **argv=NULL, int procs=(-1))
Definition: Communicate.cpp:70
std::vector< MessageData > recMsgList
Definition: Communicate.h:258
virtual ~Communicate(void)
Definition: Communicate.cpp:85
bool add_msg(Message *, int, int)
Definition: Communicate.cpp:98
static unsigned int wordround(int size)
Definition: Communicate.h:350
#define INFORM_ALL_NODES
Definition: Inform.h:38
int myNode() const
Definition: Communicate.h:155
void fill_msg_buffer(void *, Message &, int, int, int)
int getReceived() const
Definition: Communicate.h:163
static bool retransmit()
Definition: IpplInfo.h:221
#define ERRORMSG(msg)
Definition: IpplInfo.h:399
unsigned int numBytes() const
Definition: Message.h:226
std::vector< std::vector< int > > Processes
Definition: Communicate.h:265
std::vector< int > Contexts
Definition: Communicate.h:264
MsgItem & item(size_t n)
Definition: Message.h:316
void barrier(void)
int find_msg_length(Message &)
static int myNode()
Definition: IpplInfo.cpp:794
virtual bool mysend(Message *, int node, int utag, int etag)
virtual void mybarrier(void)
MsgNum_t nextMsgNum
Definition: Communicate.h:270
void request_retransmission(int node, MsgNum_t mnum)
size_t size() const
Definition: Message.h:300
static bool useChecksums()
Definition: IpplInfo.h:218
CRCTYPE crc(void *icp, int icnt)
Definition: CRC.cpp:106
SentCache_t sentMsgCache
Definition: Communicate.h:276
#define IPPL_ABORT_TAG
Definition: Tags.h:25
constexpr double c
The velocity of light in m/s.
Definition: Physics.h:52
static void addwordround(void *&pos, int size)
Definition: Communicate.h:358
Message & putmsg(void *, int, int=0)
unsigned int numElems() const
Definition: Message.h:230
virtual int broadcast_others(Message *, int, bool delmsg=true)
T * value_type(const SliceIterator< T > &)
Message & put(const T &val)
Definition: Message.h:414
virtual void cleanupMessage(void *)
void process_resend_requests()
Message * find_msg(int &, int &)
#define ADDIPPLSTAT(stat, amount)
Definition: IpplStats.h:236
static void pack(void *packdata, void *&pos, int size)
Definition: Communicate.h:364
std::vector< std::pair< int, MsgNum_t > > informOKList
Definition: Communicate.h:285
void unpack_retransmission_request(int nitems, void *pos)
unsigned int CRCTYPE
Definition: CRC.h:25
std::vector< MsgNum_t > sentOKList
Definition: Communicate.h:282
#define IPPL_RETRANSMIT_TAG
Definition: Tags.h:27
std::string::iterator iterator
Definition: MSLang.h:16
#define PInsist(c, m)
Definition: PAssert.h:135
virtual bool resend(void *buf, int size, int node, int etag)
std::vector< std::pair< int, MsgNum_t > > requestList
Definition: Communicate.h:288
void perform_resend(MsgNum_t mnum)
Message * receive_block(int &node, int &tag)
virtual Message * myreceive(int &node, int &tag, int etag)
Message * receive(int &node, int &tag)
Definition: Inform.h:41
Message & setDelete(const bool c)
Definition: Message.h:339
Message * unpack_message(int &node, int &tag, void *pos)
virtual const char * name() const
Definition: Communicate.h:137
#define IPPL_MSG_OK_TAG
Definition: Tags.h:28
Message & setCopy(const bool c)
Definition: Message.h:327
bool send(Message *, int node, int tag, bool delmsg=true)
std::vector< MsgNum_t > resendList
Definition: Communicate.h:279
virtual int broadcast_all(Message *, int)
void add_to_send_cache(void *pos, MsgNum_t mnum, int size, int node)
void * data()
Definition: Message.h:252
#define IPPL_EXIT_TAG
Definition: Tags.h:26
int getNodes() const
Definition: Communicate.h:143
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
void send_ok_message(int node, MsgNum_t mnum)
void remove_single_ok_message(MsgNum_t mnum)