OPAL (Object Oriented Parallel Accelerator Library)  2024.1
OPAL
AsyncSendBuffers.h
Go to the documentation of this file.
1 //
2 // Class AsyncSendBuffer and AsyncSendBuffers
3 //
4 // Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
5 // All rights reserved
6 //
7 // Implemented as part of the PhD thesis
8 // "Toward massively parallel multi-objective optimization with application to
9 // particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
10 //
11 // This file is part of OPAL.
12 //
13 // OPAL is free software: you can redistribute it and/or modify
14 // it under the terms of the GNU General Public License as published by
15 // the Free Software Foundation, either version 3 of the License, or
16 // (at your option) any later version.
17 //
18 // You should have received a copy of the GNU General Public License
19 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
20 //
21 #include <vector>
22 #include <algorithm>
23 #include <string.h>
24 #include "mpi.h"
25 #include "boost/bind.hpp"
26 
28 
29 public:
30 
31  AsyncSendBuffer(std::ostringstream &os) {
32  this->size_req = new MPI_Request();
33  this->buffer_req = new MPI_Request();
34  this->buf_size = os.str().length();
35  buffer = new char[buf_size];
36  memcpy(buffer, os.str().c_str(), buf_size);
37  }
38 
40  delete size_req;
41  delete buffer_req;
42  delete[] buffer;
43  }
44 
45  bool hasCompleted() {
46  int bufferflag = 0;
47  MPI_Test(this->buffer_req, &bufferflag, MPI_STATUS_IGNORE);
48  if(bufferflag) {
49  int sizeflag = 0;
50  MPI_Test(this->buffer_req, &sizeflag, MPI_STATUS_IGNORE);
51  if(sizeflag) {
52  return true;
53  }
54  }
55  return false;
56  }
57 
58  void send(int recv_rank, int size_tag, int data_tag, MPI_Comm comm) {
59  MPI_Isend(&buf_size, 1, MPI_LONG, recv_rank, size_tag, comm, size_req);
60  MPI_Isend(buffer, buf_size, MPI_CHAR, recv_rank, data_tag, comm, buffer_req);
61  }
62 
63 
64 private:
65 
66  // can't use smart pointers because MPI will hold last valid reference to
67  // pointer
68  MPI_Request *size_req;
69  MPI_Request *buffer_req;
70  char *buffer;
71 
72  size_t buf_size;
73 
74 };
75 
76 
78 
79 public:
81 
82  void insert(std::shared_ptr<AsyncSendBuffer> buf) {
83  collection_.push_back(buf);
84  }
85 
86  void cleanup() {
87  collection_.erase(std::remove_if(
88  collection_.begin(), collection_.end(), boost::bind(&AsyncSendBuffer::hasCompleted, _1)),
89  collection_.end());
90  }
91 
92  size_t size() {
93  return collection_.size();
94  }
95 
96 private:
97  std::vector< std::shared_ptr<AsyncSendBuffer> > collection_;
98 };
99 
MPI_Request * size_req
AsyncSendBuffer(std::ostringstream &os)
void send(int recv_rank, int size_tag, int data_tag, MPI_Comm comm)
std::vector< std::shared_ptr< AsyncSendBuffer > > collection_
MPI_Request * buffer_req
void insert(std::shared_ptr< AsyncSendBuffer > buf)