OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
SamplePilot.h
Go to the documentation of this file.
1 //
2 // Class SamplePilot
3 // The sample Pilot (Master): Coordinates requests by sampler to workers.
4 // Every worker thread notifies the master here if idle or not. When
5 // available the master dispatches one of the pending simulations to the
6 // worker who will run the specified simulation and report results back to
7 // the master.
8 // @see SampleWorker
9 // @see Sampler
10 // @tparam Opt_t type of the sampler
11 // @tparam Sim_t type of the simulation
12 // @tparam SolPropagationGraph_t strategy to distribute solution between
13 // master islands
14 // @tparam Comm_t comm splitter strategy
15 //
16 // Copyright (c) 2018, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
17 // Yves Ineichen, ETH Zürich
18 // All rights reserved
19 //
20 // Implemented as part of the PhD thesis
21 // "Precise Simulations of Multibunches in High Intensity Cyclotrons"
22 //
23 // This file is part of OPAL.
24 //
25 // OPAL is free software: you can redistribute it and/or modify
26 // it under the terms of the GNU General Public License as published by
27 // the Free Software Foundation, either version 3 of the License, or
28 // (at your option) any later version.
29 //
30 // You should have received a copy of the GNU General Public License
31 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
32 //
33 #ifndef __SAMPLE_PILOT_H__
34 #define __SAMPLE_PILOT_H__
35 
36 #include "Pilot/Pilot.h"
37 #include "Sample/SampleWorker.h"
39 
40 template <
41  class Opt_t
42  , class Sim_t
43  , class SolPropagationGraph_t
44  , class Comm_t
45  >
46 class SamplePilot : protected Pilot<Opt_t,
47  Sim_t,
48  SolPropagationGraph_t,
49  Comm_t>
50 {
51 
52 public:
53 
54  SamplePilot(CmdArguments_t args, boost::shared_ptr<Comm_t> comm,
55  functionDictionary_t known_expr_funcs,
56  const DVarContainer_t &dvar,
57  const Expressions::Named_t &obj,
58  const std::map< std::string,
59  std::shared_ptr<SamplingMethod>
60  >& sampleMethods,
61  const std::vector<std::string> &storeobjstr,
62  const std::vector<std::string> &filesToKeep,
63  const std::map<std::string, std::string> &userVariables)
64  : Pilot<Opt_t,
65  Sim_t,
66  SolPropagationGraph_t,
67  Comm_t>(args,
68  comm,
69  known_expr_funcs,
70  dvar,
71  obj,
73  {},
74  false,
75  {})
76  , sampleMethods_m(sampleMethods)
77  {
78  if (obj.size() == 0) {
79  // create a dummy objective, base class requires at least 1 objective
80  this->objectives_ = {
81  {"dummy", new Expressions::Expr_t("dummy")}
82  };
83  }
84 
85  this->setup(known_expr_funcs, storeobjstr, filesToKeep, userVariables);
86  }
87 
88  virtual ~SamplePilot()
89  {}
90 
91 
92 protected:
93 
95  typedef std::map<size_t, Param_t > Jobs_t;
99 
100 
101  virtual
102  void setup(functionDictionary_t known_expr_funcs,
103  const std::vector<std::string> &storeobjstr,
104  const std::vector<std::string> &filesToKeep,
105  const std::map<std::string, std::string> &userVariables)
106  {
107  this->global_rank_ = this->comm_->globalRank();
108 
109  this->parseInputFile(known_expr_funcs, false);
110 
111  MPI_Barrier(MPI_COMM_WORLD);
112 
113  // here the control flow starts to diverge
114  if ( this->comm_->isOptimizer() ) { startSampler(); }
115  else if ( this->comm_->isWorker() ) { startWorker(storeobjstr, filesToKeep, userVariables); }
116  else if ( this->comm_->isPilot() ) { this->startPilot(); }
117  }
118 
119  virtual
120  void startSampler() {
121 
122  std::ostringstream os;
123  os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Sampler"
124  << "\e[0m" << std::endl;
125  std::cout << os.str() << std::flush;
126 
127  boost::scoped_ptr<Opt_t> opt(
128  new Opt_t(sampleMethods_m, this->objectives_, this->dvars_,
129  this->comm_->getBundle(), this->cmd_args_));
130  opt->initialize();
131 
132  std::cout << "Stop Sampler.." << std::endl;
133  }
134 
136  void startWorker(const std::vector<std::string> &storeobjstr,
137  const std::vector<std::string> &filesToKeep,
138  const std::map<std::string, std::string> &userVariables)
139  {
140  std::ostringstream os;
141  os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Worker"
142  << "\e[0m" << std::endl;
143  std::cout << os.str() << std::flush;
144 
145  size_t pos = this->input_file_.find_last_of("/");
146  std::string tmplfile = this->input_file_;
147  if (pos != std::string::npos)
148  tmplfile = this->input_file_.substr(pos+1);
149  pos = tmplfile.find(".");
150  std::string simName = tmplfile.substr(0,pos);
151 
152  boost::scoped_ptr< SampleWorker<Sim_t> > w(
153  new SampleWorker<Sim_t>(this->objectives_, this->constraints_, simName,
154  this->comm_->getBundle(), this->cmd_args_,
155  storeobjstr, filesToKeep, userVariables));
156 
157  std::cout << "Stop Worker.." << std::endl;
158  }
159 
160  virtual
161  void postPoll() {
162 
163  // terminating all workers is tricky since we do not know their state.
164  // All workers are notified (to terminate) when opt has converged and
165  // all workers are idle.
166  bool all_worker_idle = true;
167 
168  // in the case where new requests became available after worker
169  // delivered last results (and switched to idle state).
170  for(int i = 0; i < this->total_available_workers_; i++) {
171 
172  if (i == this->my_rank_in_worker_comm_) continue;
173 
174  if (this->is_worker_idle_[i] && request_queue_.size() > 0)
176 
177  all_worker_idle = all_worker_idle && this->is_worker_idle_[i];
178  }
179 
180  // when all workers have been notified we can stop polling
181  if (all_worker_idle && this->has_opt_converged_) {
182  this->continue_polling_ = false;
183  int dummy = 0;
184  for(int worker = 0; worker < this->total_available_workers_; worker++) {
185  MPI_Request req;
186  MPI_Isend(&dummy, 1, MPI_INT, worker,
187  MPI_STOP_TAG, this->worker_comm_, &req);
188  }
189  }
190  }
191 
192 
193  virtual
194  void sendNewJobToWorker(int worker) /*override*/ {
195 
196  // no new jobs once our opt has converged
197  if (this->has_opt_converged_) return;
198 
199  JobIter_t job = request_queue_.begin();
200  size_t jid = job->first;
201 
202  Param_t job_params = job->second;
203  MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, worker, MPI_WORK_JOBID_TAG, this->worker_comm_);
204  MPI_Send_params(job_params, worker, this->worker_comm_);
205 
206  running_job_list_.insert(std::pair<size_t,
207  Param_t >(job->first, job->second));
208  request_queue_.erase(jid);
209  this->is_worker_idle_[worker] = false;
210 
211  std::ostringstream dump;
212  dump << "sent job with ID " << jid << " to worker " << worker
213  << std::endl;
214  this->job_trace_->log(dump);
215 
216  }
217 
218 
219  virtual
220  bool onMessage(MPI_Status status, size_t recv_value) /*override*/ {
221 
222  MPITag_t tag = MPITag_t(status.MPI_TAG);
223  switch(tag) {
224 
225  case WORKER_FINISHED_TAG: {
226 
227  size_t job_id = recv_value;
228 
229  size_t dummy = 1;
230  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, status.MPI_SOURCE,
231  MPI_WORKER_FINISHED_ACK_TAG, this->worker_comm_);
232 
233  reqVarContainer_t res;
234  MPI_Recv_reqvars(res, status.MPI_SOURCE, this->worker_comm_);
235 
236  running_job_list_.erase(job_id);
237  this->is_worker_idle_[status.MPI_SOURCE] = true;
238 
239  std::ostringstream dump;
240  dump << "worker finished job with ID " << job_id << std::endl;
241  this->job_trace_->log(dump);
242 
243 
244  // sampler already terminated, cannot accept new messages
245  if (this->has_opt_converged_) return true;
246 
247  int opt_master_rank = this->comm_->getLeader();
248  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, opt_master_rank,
250 
251  MPI_Send_reqvars(res, opt_master_rank, this->opt_comm_);
252 
253  // we keep worker busy _after_ results have been sent to sampler
254  if (request_queue_.size() > 0)
255  sendNewJobToWorker(status.MPI_SOURCE);
256 
257  return true;
258  }
259 
260  case OPT_NEW_JOB_TAG: {
261 
262  size_t job_id = recv_value;
263  int opt_master_rank = this->comm_->getLeader();
264 
265  Param_t job_params;
266  MPI_Recv_params(job_params, (size_t)opt_master_rank, this->opt_comm_);
267 
268  request_queue_.insert(
269  std::pair<size_t, Param_t >(
270  job_id, job_params));
271 
272  std::ostringstream dump;
273  dump << "new opt job with ID " << job_id << std::endl;
274  this->job_trace_->log(dump);
275 
276  return true;
277  }
278 
279  case OPT_CONVERGED_TAG: {
280  return this->stop();
281  }
282 
284  this->is_worker_idle_[status.MPI_SOURCE] = true;
285  return true;
286  }
287 
288  default: {
289  std::string msg = "(Pilot) Error: unexpected MPI_TAG: ";
290  msg += status.MPI_TAG;
291  throw OptPilotException("SamplePilot::onMessage", msg);
292  }
293  }
294  }
295 
296 private:
297  std::map< std::string,
298  std::shared_ptr<SamplingMethod>
300 };
301 
302 #endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
std::map< std::string, client::function::type > functionDictionary_t
Definition: Expression.h:56
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Send_params(Param_t params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:87
void MPI_Recv_reqvars(reqVarContainer_t &reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:165
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_OPT_JOB_FINISHED_TAG
pilot tells optimizer that results are ready to collect
Definition: MPIHelper.h:46
MPITag_t
Definition: MPIHelper.h:71
@ WORKER_FINISHED_TAG
Definition: MPIHelper.h:72
@ OPT_CONVERGED_TAG
Definition: MPIHelper.h:74
@ OPT_NEW_JOB_TAG
Definition: MPIHelper.h:73
@ WORKER_STATUSUPDATE_TAG
Definition: MPIHelper.h:75
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
std::map< std::string, DVar_t > DVarContainer_t
Definition: Types.h:92
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
Representation objects and parsers for attribute expressions.
Definition: Expressions.h:64
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
Expression Expr_t
type of an expression
Definition: Expression.h:63
std::string::iterator iterator
Definition: MSLang.h:16
virtual void setup(functionDictionary_t known_expr_funcs, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:102
std::map< std::string, std::shared_ptr< SamplingMethod > > sampleMethods_m
Definition: SamplePilot.h:299
Jobs_t running_job_list_
Definition: SamplePilot.h:97
std::map< size_t, Param_t > Jobs_t
keep track of requests and running jobs
Definition: SamplePilot.h:95
virtual void postPoll()
executed after handling (if any) new request
Definition: SamplePilot.h:161
virtual ~SamplePilot()
Definition: SamplePilot.h:88
SamplePilot(CmdArguments_t args, boost::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs, const DVarContainer_t &dvar, const Expressions::Named_t &obj, const std::map< std::string, std::shared_ptr< SamplingMethod > > &sampleMethods, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:54
virtual void sendNewJobToWorker(int worker)
Definition: SamplePilot.h:194
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: SamplePilot.h:220
Jobs_t::iterator JobIter_t
Definition: SamplePilot.h:96
Jobs_t request_queue_
Definition: SamplePilot.h:98
virtual void startSampler()
Definition: SamplePilot.h:120
void startWorker(const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:136
Definition: Pilot.h:102
bool has_opt_converged_
Definition: Pilot.h:183
int total_available_workers_
Definition: Pilot.h:182
boost::shared_ptr< Comm_t > comm_
Definition: Pilot.h:166
std::string input_file_
input file for simulation with embedded optimization problem
Definition: Pilot.h:180
std::vector< bool > is_worker_idle_
Definition: Pilot.h:193
virtual void startPilot()
Definition: Pilot.h:311
int global_rank_
Definition: Pilot.h:169
bool stop(bool isOpt=true)
Definition: Pilot.h:573
int my_rank_in_worker_comm_
Definition: Pilot.h:170
Expressions::Named_t constraints_
constraints
Definition: Pilot.h:187
bool continue_polling_
Definition: Pilot.h:184
MPI_Comm opt_comm_
MPI communicator used for messages to/from optimizer.
Definition: Pilot.h:162
boost::scoped_ptr< Trace > job_trace_
Definition: Pilot.h:203
Expressions::Named_t objectives_
objectives
Definition: Pilot.h:186
MPI_Comm worker_comm_
MPI communicator used for messages to/from worker.
Definition: Pilot.h:160
DVarContainer_t dvars_
design variables
Definition: Pilot.h:188
void parseInputFile(functionDictionary_t, bool isOptimizationRun)
Definition: Pilot.h:238