OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
SamplePilot.h
Go to the documentation of this file.
1 #ifndef __SAMPLE_PILOT_H__
2 #define __SAMPLE_PILOT_H__
3 
4 #include "Pilot/Pilot.h"
5 #include "Sample/SampleWorker.h"
7 
27 template <
28  class Opt_t
29  , class Sim_t
30  , class SolPropagationGraph_t
31  , class Comm_t
32  >
33 class SamplePilot : protected Pilot<Opt_t,
34  Sim_t,
35  SolPropagationGraph_t,
36  Comm_t>
37 {
38 
39 public:
40 
41  SamplePilot(CmdArguments_t args, boost::shared_ptr<Comm_t> comm,
42  functionDictionary_t known_expr_funcs,
43  const DVarContainer_t &dvar,
44  const Expressions::Named_t &obj,
45  const std::map< std::string,
46  std::shared_ptr<SamplingMethod>
47  >& sampleMethods,
48  const std::vector<std::string> &storeobjstr,
49  const std::vector<std::string> &filesToKeep)
50  : Pilot<Opt_t,
51  Sim_t,
52  SolPropagationGraph_t,
53  Comm_t>(args,
54  comm,
55  known_expr_funcs,
56  dvar,
57  obj,
58  Expressions::Named_t(),
59  {},
60  false)
61  , sampleMethods_m(sampleMethods)
62  {
63  if (obj.size() == 0) {
64  // create a dummy objective, base class requires at least 1 objective
65  this->objectives_ = {
66  {"dummy", new Expressions::Expr_t("dummy")}
67  };
68  }
69 
70  this->setup(known_expr_funcs, storeobjstr, filesToKeep);
71  }
72 
73  virtual ~SamplePilot()
74  {}
75 
76 
77 protected:
78 
80  typedef std::map<size_t, Param_t > Jobs_t;
84 
85 
86  virtual
87  void setup(functionDictionary_t known_expr_funcs,
88  const std::vector<std::string> &storeobjstr,
89  const std::vector<std::string> &filesToKeep)
90  {
91  this->global_rank_ = this->comm_->globalRank();
92 
93  this->parseInputFile(known_expr_funcs, false);
94 
95  MPI_Barrier(MPI_COMM_WORLD);
96 
97  // here the control flow starts to diverge
98  if ( this->comm_->isOptimizer() ) { startSampler(); }
99  else if ( this->comm_->isWorker() ) { startWorker(storeobjstr, filesToKeep); }
100  else if ( this->comm_->isPilot() ) { this->startPilot(); }
101  }
102 
103  virtual
104  void startSampler() {
105 
106  std::ostringstream os;
107  os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Sampler"
108  << "\e[0m" << std::endl;
109  std::cout << os.str() << std::flush;
110 
111  boost::scoped_ptr<Opt_t> opt(
112  new Opt_t(sampleMethods_m, this->objectives_, this->dvars_,
113  this->comm_->getBundle(), this->cmd_args_));
114  opt->initialize();
115 
116  std::cout << "Stop Sampler.." << std::endl;
117  }
118 
120  void startWorker(const std::vector<std::string> &storeobjstr,
121  const std::vector<std::string> &filesToKeep)
122  {
123  std::ostringstream os;
124  os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Worker"
125  << "\e[0m" << std::endl;
126  std::cout << os.str() << std::flush;
127 
128  size_t pos = this->input_file_.find_last_of("/");
129  std::string tmplfile = this->input_file_;
130  if (pos != std::string::npos)
131  tmplfile = this->input_file_.substr(pos+1);
132  pos = tmplfile.find(".");
133  std::string simName = tmplfile.substr(0,pos);
134 
135  boost::scoped_ptr< SampleWorker<Sim_t> > w(
136  new SampleWorker<Sim_t>(this->objectives_, this->constraints_, simName,
137  this->comm_->getBundle(), this->cmd_args_,
138  storeobjstr, filesToKeep));
139 
140  std::cout << "Stop Worker.." << std::endl;
141  }
142 
143  virtual
144  void postPoll() {
145 
146  // terminating all workers is tricky since we do not know their state.
147  // All workers are notified (to terminate) when opt has converged and
148  // all workers are idle.
149  bool all_worker_idle = true;
150 
151  // in the case where new requests became available after worker
152  // delivered last results (and switched to idle state).
153  for(int i = 0; i < this->total_available_workers_; i++) {
154 
155  if (i == this->my_rank_in_worker_comm_) continue;
156 
157  if (this->is_worker_idle_[i] && request_queue_.size() > 0)
159 
160  all_worker_idle = all_worker_idle && this->is_worker_idle_[i];
161  }
162 
163  // when all workers have been notified we can stop polling
164  if (all_worker_idle && this->has_opt_converged_) {
165  this->continue_polling_ = false;
166  int dummy = 0;
167  for(int worker = 0; worker < this->total_available_workers_; worker++) {
168  MPI_Request req;
169  MPI_Isend(&dummy, 1, MPI_INT, worker,
170  MPI_STOP_TAG, this->worker_comm_, &req);
171  }
172  }
173  }
174 
175 
176  virtual
177  void sendNewJobToWorker(int worker) /*override*/ {
178 
179  // no new jobs once our opt has converged
180  if (this->has_opt_converged_) return;
181 
182  JobIter_t job = request_queue_.begin();
183  size_t jid = job->first;
184 
185  Param_t job_params = job->second;
186  MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, worker, MPI_WORK_JOBID_TAG, this->worker_comm_);
187  MPI_Send_params(job_params, worker, this->worker_comm_);
188 
189  running_job_list_.insert(std::pair<size_t,
190  Param_t >(job->first, job->second));
191  request_queue_.erase(jid);
192  this->is_worker_idle_[worker] = false;
193 
194  std::ostringstream dump;
195  dump << "sent job with ID " << jid << " to worker " << worker
196  << std::endl;
197  this->job_trace_->log(dump);
198 
199  }
200 
201 
202  virtual
203  bool onMessage(MPI_Status status, size_t recv_value) /*override*/ {
204 
205  MPITag_t tag = MPITag_t(status.MPI_TAG);
206  switch(tag) {
207 
208  case WORKER_FINISHED_TAG: {
209 
210  size_t job_id = recv_value;
211 
212  size_t dummy = 1;
213  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, status.MPI_SOURCE,
214  MPI_WORKER_FINISHED_ACK_TAG, this->worker_comm_);
215 
216  reqVarContainer_t res;
217  MPI_Recv_reqvars(res, status.MPI_SOURCE, this->worker_comm_);
218 
219  running_job_list_.erase(job_id);
220  this->is_worker_idle_[status.MPI_SOURCE] = true;
221 
222  std::ostringstream dump;
223  dump << "worker finished job with ID " << job_id << std::endl;
224  this->job_trace_->log(dump);
225 
226 
227  // sampler already terminated, cannot accept new messages
228  if (this->has_opt_converged_) return true;
229 
230  int opt_master_rank = this->comm_->getLeader();
231  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, opt_master_rank,
233 
234  MPI_Send_reqvars(res, opt_master_rank, this->opt_comm_);
235 
236  // we keep worker busy _after_ results have been sent to sampler
237  if (request_queue_.size() > 0)
238  sendNewJobToWorker(status.MPI_SOURCE);
239 
240  return true;
241  }
242 
243  case OPT_NEW_JOB_TAG: {
244 
245  size_t job_id = recv_value;
246  int opt_master_rank = this->comm_->getLeader();
247 
248  Param_t job_params;
249  MPI_Recv_params(job_params, (size_t)opt_master_rank, this->opt_comm_);
250 
251  request_queue_.insert(
252  std::pair<size_t, Param_t >(
253  job_id, job_params));
254 
255  std::ostringstream dump;
256  dump << "new opt job with ID " << job_id << std::endl;
257  this->job_trace_->log(dump);
258 
259  return true;
260  }
261 
262  case OPT_CONVERGED_TAG: {
263  return this->stop();
264  }
265 
267  this->is_worker_idle_[status.MPI_SOURCE] = true;
268  return true;
269  }
270 
271  default: {
272  std::string msg = "(Pilot) Error: unexpected MPI_TAG: ";
273  msg += status.MPI_TAG;
274  throw OptPilotException("SamplePilot::onMessage", msg);
275  }
276  }
277  }
278 
279 private:
280  std::map< std::string,
281  std::shared_ptr<SamplingMethod>
283 };
284 
285 #endif
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:64
void parseInputFile(functionDictionary_t known_expr_funcs, bool isOptimizationRun)
Definition: Pilot.h:219
boost::scoped_ptr< Trace > job_trace_
Definition: Pilot.h:185
bool continue_polling_
Definition: Pilot.h:166
std::string input_file_
input file for simulation with embedded optimization problem
Definition: Pilot.h:162
Jobs_t running_job_list_
Definition: SamplePilot.h:82
MPI_Comm worker_comm_
MPI communicator used for messages to/from worker.
Definition: Pilot.h:142
void startWorker(const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep)
Definition: SamplePilot.h:120
virtual void startSampler()
Definition: SamplePilot.h:104
virtual void startWorker()
Definition: Pilot.h:270
virtual ~SamplePilot()
Definition: SamplePilot.h:73
SamplePilot(CmdArguments_t args, boost::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs, const DVarContainer_t &dvar, const Expressions::Named_t &obj, const std::map< std::string, std::shared_ptr< SamplingMethod > > &sampleMethods, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep)
Definition: SamplePilot.h:41
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:169
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:17
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:125
virtual void postPoll()
executed after handling (if any) new request
Definition: SamplePilot.h:144
Expressions::Named_t objectives_
objectives
Definition: Pilot.h:168
std::map< std::string, std::shared_ptr< SamplingMethod > > sampleMethods_m
Definition: SamplePilot.h:282
sampleMethods_m(sampleMethods)
Definition: SamplePilot.h:61
namedVariableCollection_t Param_t
Definition: Types.h:33
Expression Expr_t
type of an expression
Definition: Expression.h:34
virtual void setup(functionDictionary_t known_expr_funcs, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep)
Definition: SamplePilot.h:87
virtual void startPilot()
Definition: Pilot.h:292
A worker MPI entity consists of a processor group that runs a simulation of type Sim_t. The main loop in run() accepts new jobs from the master process runs the simulation and reports back the results.
Definition: SampleWorker.h:20
Jobs_t::iterator JobIter_t
Definition: SamplePilot.h:81
int total_available_workers_
Definition: Pilot.h:164
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:32
Expressions::Named_t constraints_
constraints
Definition: Pilot.h:169
virtual void sendNewJobToWorker(int worker)
Definition: SamplePilot.h:177
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:107
int global_rank_
Definition: Pilot.h:151
DVarContainer_t dvars_
design variables
Definition: Pilot.h:170
std::map< std::string, DVar_t > DVarContainer_t
Definition: Types.h:77
void MPI_Send_params(Param_t params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:66
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:45
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: SamplePilot.h:203
boost::shared_ptr< Comm_t > comm_
Definition: Pilot.h:148
CmdArguments_t cmd_args_
Definition: Pilot.h:149
void MPI_Recv_reqvars(reqVarContainer_t &reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:144
The sample Pilot (Master): Coordinates requests by sampler to workers.
Definition: SamplePilot.h:33
int my_rank_in_worker_comm_
Definition: Pilot.h:152
MPITag_t
Definition: MPIHelper.h:51
std::vector< bool > is_worker_idle_
Definition: Pilot.h:175
std::string::iterator iterator
Definition: MSLang.h:16
std::map< size_t, Param_t > Jobs_t
keep track of requests and running jobs
Definition: SamplePilot.h:80
bool stop(bool isOpt=true)
Definition: Pilot.h:554
#define MPI_OPT_JOB_FINISHED_TAG
pilot tells optimizer that results are ready to collect
Definition: MPIHelper.h:26
std::map< std::string, client::function::type > functionDictionary_t
Definition: Expression.h:27
Jobs_t request_queue_
Definition: SamplePilot.h:83
MPI_Comm opt_comm_
MPI communicator used for messages to/from optimizer.
Definition: Pilot.h:144
The Optimization Pilot (Master): Coordinates requests by optimizer to workers and reports results bac...
Definition: Pilot.h:85
bool has_opt_converged_
Definition: Pilot.h:165
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:44