OPAL (Object Oriented Parallel Accelerator Library)  2024.1
OPAL
SamplePilot.h
Go to the documentation of this file.
1 //
2 // Class SamplePilot
3 // The sample Pilot (Master): Coordinates requests by sampler to workers.
4 // Every worker thread notifies the master here if idle or not. When
5 // available the master dispatches one of the pending simulations to the
6 // worker who will run the specified simulation and report results back to
7 // the master.
8 // @see SampleWorker
9 // @see Sampler
10 // @tparam Opt_t type of the sampler
11 // @tparam Sim_t type of the simulation
12 // @tparam SolPropagationGraph_t strategy to distribute solution between
13 // master islands
14 // @tparam Comm_t comm splitter strategy
15 //
16 // Copyright (c) 2018, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
17 // Yves Ineichen, ETH Zürich
18 // All rights reserved
19 //
20 // Implemented as part of the PhD thesis
21 // "Precise Simulations of Multibunches in High Intensity Cyclotrons"
22 //
23 // This file is part of OPAL.
24 //
25 // OPAL is free software: you can redistribute it and/or modify
26 // it under the terms of the GNU General Public License as published by
27 // the Free Software Foundation, either version 3 of the License, or
28 // (at your option) any later version.
29 //
30 // You should have received a copy of the GNU General Public License
31 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
32 //
33 #ifndef __SAMPLE_PILOT_H__
34 #define __SAMPLE_PILOT_H__
35 
36 #include "Pilot/Pilot.h"
37 #include "Sample/SampleWorker.h"
39 
40 template <
41  class Opt_t
42  , class Sim_t
43  , class SolPropagationGraph_t
44  , class Comm_t
45  >
46 class SamplePilot : protected Pilot<Opt_t,
47  Sim_t,
48  SolPropagationGraph_t,
49  Comm_t>
50 {
51 
52 public:
53 
54  SamplePilot(CmdArguments_t args, std::shared_ptr<Comm_t> comm,
55  functionDictionary_t known_expr_funcs,
56  const DVarContainer_t &dvar,
57  const Expressions::Named_t &obj,
58  const std::map< std::string,
59  std::shared_ptr<SamplingMethod>
60  >& sampleMethods,
61  const std::vector<std::string> &storeobjstr,
62  const std::vector<std::string> &filesToKeep,
63  const std::map<std::string, std::string> &userVariables)
64  : Pilot<Opt_t,
65  Sim_t,
66  SolPropagationGraph_t,
67  Comm_t>(args,
68  comm,
69  known_expr_funcs,
70  dvar,
71  obj,
72  Expressions::Named_t(),
73  {},
74  false,
75  {})
76  , sampleMethods_m(sampleMethods)
77  {
78  if (obj.empty()) {
79  // create a dummy objective, base class requires at least 1 objective
80  this->objectives_ = {
81  {"dummy", new Expressions::Expr_t("dummy")}
82  };
83  }
84 
85  this->setup(known_expr_funcs, storeobjstr, filesToKeep, userVariables);
86  }
87 
88  virtual ~SamplePilot()
89  {}
90 
91 
92 protected:
93 
95  typedef std::map<size_t, Param_t > Jobs_t;
99 
100 
101  virtual
102  void setup(functionDictionary_t known_expr_funcs,
103  const std::vector<std::string> &storeobjstr,
104  const std::vector<std::string> &filesToKeep,
105  const std::map<std::string, std::string> &userVariables)
106  {
107  this->global_rank_ = this->comm_->globalRank();
108 
109  this->parseInputFile(known_expr_funcs, false);
110 
111  MPI_Barrier(MPI_COMM_WORLD);
112 
113  // here the control flow starts to diverge
114  if ( this->comm_->isOptimizer() ) { startSampler(); }
115  else if ( this->comm_->isWorker() ) { startWorker(storeobjstr, filesToKeep, userVariables); }
116  else if ( this->comm_->isPilot() ) { this->startPilot(); }
117  }
118 
119  virtual
120  void startSampler() {
121 
122  std::ostringstream os;
123  os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Sampler"
124  << "\e[0m" << std::endl;
125  std::cout << os.str() << std::flush;
126 
127  const std::unique_ptr<Opt_t> opt(
128  new Opt_t(sampleMethods_m, this->objectives_, this->dvars_,
129  this->comm_->getBundle(), this->cmd_args_));
130  opt->initialize();
131 
132  std::cout << "Stop Sampler.." << std::endl;
133  }
134 
136  void startWorker(const std::vector<std::string> &storeobjstr,
137  const std::vector<std::string> &filesToKeep,
138  const std::map<std::string, std::string> &userVariables)
139  {
140  std::ostringstream os;
141  os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Worker"
142  << "\e[0m" << std::endl;
143  std::cout << os.str() << std::flush;
144 
145  size_t pos = this->input_file_.find_last_of("/");
146  std::string tmplfile = this->input_file_;
147  if (pos != std::string::npos)
148  tmplfile = this->input_file_.substr(pos+1);
149  pos = tmplfile.find(".");
150  std::string simName = tmplfile.substr(0,pos);
151 
152  const std::unique_ptr< SampleWorker<Sim_t> > w(
153  new SampleWorker<Sim_t>(this->objectives_, this->constraints_, simName,
154  this->comm_->getBundle(), this->cmd_args_,
155  storeobjstr, filesToKeep, userVariables));
156 
157  std::cout << "Stop Worker.." << std::endl;
158  }
159 
160  virtual
161  void postPoll() {
162 
163  // terminating all workers is tricky since we do not know their state.
164  // All workers are notified (to terminate) when opt has converged and
165  // all workers are idle.
166  bool all_worker_idle = true;
167 
168  // in the case where new requests became available after worker
169  // delivered last results (and switched to idle state).
170  for(int i = 0; i < this->total_available_workers_; i++) {
171 
172  if (i == this->my_rank_in_worker_comm_) continue;
173 
174  if (this->is_worker_idle_[i] && !request_queue_.empty())
176 
177  all_worker_idle = all_worker_idle && this->is_worker_idle_[i];
178  }
179 
180  // when all workers have been notified we can stop polling
181  if (all_worker_idle && this->has_opt_converged_) {
182  this->continue_polling_ = false;
183  int dummy = 0;
184  for(int worker = 0; worker < this->total_available_workers_; worker++) {
185  MPI_Request req;
186  MPI_Isend(&dummy, 1, MPI_INT, worker,
187  MPI_STOP_TAG, this->worker_comm_, &req);
188  }
189  }
190  }
191 
192 
193  virtual
194  void sendNewJobToWorker(int worker) /*override*/ {
195 
196  // no new jobs once our opt has converged
197  if (this->has_opt_converged_) return;
198 
199  JobIter_t job = request_queue_.begin();
200  size_t jid = job->first;
201 
202  Param_t job_params = job->second;
203  MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, worker, MPI_WORK_JOBID_TAG, this->worker_comm_);
204  MPI_Send_params(job_params, worker, this->worker_comm_);
205 
206  running_job_list_.insert(std::pair<size_t,
207  Param_t >(job->first, job->second));
208  request_queue_.erase(jid);
209  this->is_worker_idle_[worker] = false;
210 
211  std::ostringstream dump;
212  dump << "sent job with ID " << jid << " to worker " << worker
213  << std::endl;
214  this->job_trace_->log(dump);
215 
216  }
217 
218 
219  virtual
220  bool onMessage(MPI_Status status, size_t recv_value) /*override*/ {
221 
222  MPITag_t tag = MPITag_t(status.MPI_TAG);
223  switch(tag) {
224 
225  case WORKER_FINISHED_TAG: {
226 
227  size_t job_id = recv_value;
228 
229  size_t dummy = 1;
230  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, status.MPI_SOURCE,
231  MPI_WORKER_FINISHED_ACK_TAG, this->worker_comm_);
232 
233  reqVarContainer_t res;
234  MPI_Recv_reqvars(res, status.MPI_SOURCE, this->worker_comm_);
235 
236  running_job_list_.erase(job_id);
237  this->is_worker_idle_[status.MPI_SOURCE] = true;
238 
239  std::ostringstream dump;
240  dump << "worker finished job with ID " << job_id << std::endl;
241  this->job_trace_->log(dump);
242 
243 
244  // sampler already terminated, cannot accept new messages
245  if (this->has_opt_converged_) return true;
246 
247  int opt_master_rank = this->comm_->getLeader();
248  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, opt_master_rank,
250 
251  MPI_Send_reqvars(res, opt_master_rank, this->opt_comm_);
252 
253  // we keep worker busy _after_ results have been sent to sampler
254  if (!request_queue_.empty())
255  sendNewJobToWorker(status.MPI_SOURCE);
256 
257  return true;
258  }
259 
260  case OPT_NEW_JOB_TAG: {
261 
262  size_t job_id = recv_value;
263  int opt_master_rank = this->comm_->getLeader();
264 
265  Param_t job_params;
266  MPI_Recv_params(job_params, (size_t)opt_master_rank, this->opt_comm_);
267 
268  request_queue_.insert(
269  std::pair<size_t, Param_t >(
270  job_id, job_params));
271 
272  std::ostringstream dump;
273  dump << "new opt job with ID " << job_id << std::endl;
274  this->job_trace_->log(dump);
275 
276  return true;
277  }
278 
279  case OPT_CONVERGED_TAG: {
280  return this->stop();
281  }
282 
284  this->is_worker_idle_[status.MPI_SOURCE] = true;
285  return true;
286  }
287 
288  default: {
289  std::string msg = "(Pilot) Error: unexpected MPI_TAG: ";
290  msg += status.MPI_TAG;
291  throw OptPilotException("SamplePilot::onMessage", msg);
292  }
293  }
294  }
295 
296 private:
297  std::map< std::string,
298  std::shared_ptr<SamplingMethod>
300 };
301 
302 #endif
Expression Expr_t
type of an expression
Definition: Expression.h:63
DVarContainer_t dvars_
design variables
Definition: Pilot.h:185
virtual void sendNewJobToWorker(int worker)
Definition: SamplePilot.h:194
int total_available_workers_
Definition: Pilot.h:179
namedVariableCollection_t Param_t
Definition: Types.h:48
int my_rank_in_worker_comm_
Definition: Pilot.h:167
std::string input_file_
input file for simulation with embedded optimization problem
Definition: Pilot.h:177
and that you know you can do these things To protect your we need to make restrictions that forbid anyone to deny you these rights or to ask you to surrender the rights These restrictions translate to certain responsibilities for you if you distribute copies of the or if you modify it For if you distribute copies of such a whether gratis or for a you must give the recipients all the rights that you have You must make sure that receive or can get the source code And you must show them these terms so they know their rights We protect your rights with two distribute and or modify the software for each author s protection and we want to make certain that everyone understands that there is no warranty for this free software If the software is modified by someone else and passed we want its recipients to know that what they have is not the so that any problems introduced by others will not reflect on the original authors reputations any free program is threatened constantly by software patents We wish to avoid the danger that redistributors of a free program will individually obtain patent in effect making the program proprietary To prevent this
Definition: LICENSE:43
virtual void postPoll()
executed after handling (if any) new request
Definition: SamplePilot.h:161
std::vector< bool > is_worker_idle_
Definition: Pilot.h:190
Expressions::Named_t objectives_
objectives
Definition: Pilot.h:183
Param_t Jobs_t
Definition: SamplePilot.h:75
Definition: Pilot.h:99
void parseInputFile(functionDictionary_t, bool isOptimizationRun)
Definition: Pilot.h:234
std::unique_ptr< Trace > job_trace_
Definition: Pilot.h:199
virtual void startPilot()
Definition: Pilot.h:307
std::map< std::string, DVar_t > DVarContainer_t
Definition: Types.h:92
Expressions::Named_t constraints_
constraints
Definition: Pilot.h:184
void startWorker(const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:136
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
MPI_Comm worker_comm_
MPI communicator used for messages to/from worker.
Definition: Pilot.h:157
std::string::iterator iterator
Definition: MSLang.h:15
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
std::map< std::string, std::shared_ptr< SamplingMethod > > sampleMethods_m
Definition: SamplePilot.h:299
#define MPI_OPT_JOB_FINISHED_TAG
pilot tells optimizer that results are ready to collect
Definition: MPIHelper.h:46
int global_rank_
Definition: Pilot.h:166
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
Jobs_t request_queue_
Definition: SamplePilot.h:98
std::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
SamplePilot(CmdArguments_t args, std::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs, const DVarContainer_t &dvar, const Expressions::Named_t &obj, const std::map< std::string, std::shared_ptr< SamplingMethod > > &sampleMethods, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:54
std::map< std::string, client::function::type > functionDictionary_t
Definition: Expression.h:56
bool continue_polling_
Definition: Pilot.h:181
bool stop(bool isOpt=true)
Definition: Pilot.h:569
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
void MPI_Recv_reqvars(reqVarContainer_t &reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:165
Jobs_t running_job_list_
Definition: SamplePilot.h:97
Jobs_t::iterator JobIter_t
Definition: SamplePilot.h:96
void MPI_Send_params(Param_t params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:87
std::shared_ptr< Comm_t > comm_
Definition: Pilot.h:163
MPI_Comm opt_comm_
MPI communicator used for messages to/from optimizer.
Definition: Pilot.h:159
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: SamplePilot.h:220
virtual void startSampler()
Definition: SamplePilot.h:120
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
bool has_opt_converged_
Definition: Pilot.h:180
MPITag_t
Definition: MPIHelper.h:71
CmdArguments_t cmd_args_
Definition: Pilot.h:164
virtual void setup(functionDictionary_t known_expr_funcs, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:102