OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
SamplePilot.h
Go to the documentation of this file.
1//
2// Class SamplePilot
3// The sample Pilot (Master): Coordinates requests by sampler to workers.
4// Every worker thread notifies the master here if idle or not. When
5// available the master dispatches one of the pending simulations to the
6// worker who will run the specified simulation and report results back to
7// the master.
8// @see SampleWorker
9// @see Sampler
10// @tparam Opt_t type of the sampler
11// @tparam Sim_t type of the simulation
12// @tparam SolPropagationGraph_t strategy to distribute solution between
13// master islands
14// @tparam Comm_t comm splitter strategy
15//
16// Copyright (c) 2018, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
17// Yves Ineichen, ETH Zürich
18// All rights reserved
19//
20// Implemented as part of the PhD thesis
21// "Precise Simulations of Multibunches in High Intensity Cyclotrons"
22//
23// This file is part of OPAL.
24//
25// OPAL is free software: you can redistribute it and/or modify
26// it under the terms of the GNU General Public License as published by
27// the Free Software Foundation, either version 3 of the License, or
28// (at your option) any later version.
29//
30// You should have received a copy of the GNU General Public License
31// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
32//
33#ifndef __SAMPLE_PILOT_H__
34#define __SAMPLE_PILOT_H__
35
36#include "Pilot/Pilot.h"
37#include "Sample/SampleWorker.h"
39
40template <
41 class Opt_t
42 , class Sim_t
43 , class SolPropagationGraph_t
44 , class Comm_t
45 >
46class SamplePilot : protected Pilot<Opt_t,
47 Sim_t,
48 SolPropagationGraph_t,
49 Comm_t>
50{
51
52public:
53
54 SamplePilot(CmdArguments_t args, boost::shared_ptr<Comm_t> comm,
55 functionDictionary_t known_expr_funcs,
56 const DVarContainer_t &dvar,
57 const Expressions::Named_t &obj,
58 const std::map< std::string,
59 std::shared_ptr<SamplingMethod>
60 >& sampleMethods,
61 const std::vector<std::string> &storeobjstr,
62 const std::vector<std::string> &filesToKeep,
63 const std::map<std::string, std::string> &userVariables)
64 : Pilot<Opt_t,
65 Sim_t,
66 SolPropagationGraph_t,
67 Comm_t>(args,
68 comm,
69 known_expr_funcs,
70 dvar,
71 obj,
73 {},
74 false,
75 {})
76 , sampleMethods_m(sampleMethods)
77 {
78 if (obj.empty()) {
79 // create a dummy objective, base class requires at least 1 objective
80 this->objectives_ = {
81 {"dummy", new Expressions::Expr_t("dummy")}
82 };
83 }
84
85 this->setup(known_expr_funcs, storeobjstr, filesToKeep, userVariables);
86 }
87
88 virtual ~SamplePilot()
89 {}
90
91
92protected:
93
95 typedef std::map<size_t, Param_t > Jobs_t;
99
100
101 virtual
102 void setup(functionDictionary_t known_expr_funcs,
103 const std::vector<std::string> &storeobjstr,
104 const std::vector<std::string> &filesToKeep,
105 const std::map<std::string, std::string> &userVariables)
106 {
107 this->global_rank_ = this->comm_->globalRank();
108
109 this->parseInputFile(known_expr_funcs, false);
110
111 MPI_Barrier(MPI_COMM_WORLD);
112
113 // here the control flow starts to diverge
114 if ( this->comm_->isOptimizer() ) { startSampler(); }
115 else if ( this->comm_->isWorker() ) { startWorker(storeobjstr, filesToKeep, userVariables); }
116 else if ( this->comm_->isPilot() ) { this->startPilot(); }
117 }
118
119 virtual
121
122 std::ostringstream os;
123 os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Sampler"
124 << "\e[0m" << std::endl;
125 std::cout << os.str() << std::flush;
126
127 boost::scoped_ptr<Opt_t> opt(
128 new Opt_t(sampleMethods_m, this->objectives_, this->dvars_,
129 this->comm_->getBundle(), this->cmd_args_));
130 opt->initialize();
131
132 std::cout << "Stop Sampler.." << std::endl;
133 }
134
136 void startWorker(const std::vector<std::string> &storeobjstr,
137 const std::vector<std::string> &filesToKeep,
138 const std::map<std::string, std::string> &userVariables)
139 {
140 std::ostringstream os;
141 os << "\033[01;35m" << " " << this->global_rank_ << " (PID: " << getpid() << ") ▶ Worker"
142 << "\e[0m" << std::endl;
143 std::cout << os.str() << std::flush;
144
145 size_t pos = this->input_file_.find_last_of("/");
146 std::string tmplfile = this->input_file_;
147 if (pos != std::string::npos)
148 tmplfile = this->input_file_.substr(pos+1);
149 pos = tmplfile.find(".");
150 std::string simName = tmplfile.substr(0,pos);
151
152 boost::scoped_ptr< SampleWorker<Sim_t> > w(
153 new SampleWorker<Sim_t>(this->objectives_, this->constraints_, simName,
154 this->comm_->getBundle(), this->cmd_args_,
155 storeobjstr, filesToKeep, userVariables));
156
157 std::cout << "Stop Worker.." << std::endl;
158 }
159
160 virtual
161 void postPoll() {
162
163 // terminating all workers is tricky since we do not know their state.
164 // All workers are notified (to terminate) when opt has converged and
165 // all workers are idle.
166 bool all_worker_idle = true;
167
168 // in the case where new requests became available after worker
169 // delivered last results (and switched to idle state).
170 for(int i = 0; i < this->total_available_workers_; i++) {
171
172 if (i == this->my_rank_in_worker_comm_) continue;
173
174 if (this->is_worker_idle_[i] && !request_queue_.empty())
176
177 all_worker_idle = all_worker_idle && this->is_worker_idle_[i];
178 }
179
180 // when all workers have been notified we can stop polling
181 if (all_worker_idle && this->has_opt_converged_) {
182 this->continue_polling_ = false;
183 int dummy = 0;
184 for(int worker = 0; worker < this->total_available_workers_; worker++) {
185 MPI_Request req;
186 MPI_Isend(&dummy, 1, MPI_INT, worker,
187 MPI_STOP_TAG, this->worker_comm_, &req);
188 }
189 }
190 }
191
192
193 virtual
194 void sendNewJobToWorker(int worker) /*override*/ {
195
196 // no new jobs once our opt has converged
197 if (this->has_opt_converged_) return;
198
199 JobIter_t job = request_queue_.begin();
200 size_t jid = job->first;
201
202 Param_t job_params = job->second;
203 MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, worker, MPI_WORK_JOBID_TAG, this->worker_comm_);
204 MPI_Send_params(job_params, worker, this->worker_comm_);
205
206 running_job_list_.insert(std::pair<size_t,
207 Param_t >(job->first, job->second));
208 request_queue_.erase(jid);
209 this->is_worker_idle_[worker] = false;
210
211 std::ostringstream dump;
212 dump << "sent job with ID " << jid << " to worker " << worker
213 << std::endl;
214 this->job_trace_->log(dump);
215
216 }
217
218
219 virtual
220 bool onMessage(MPI_Status status, size_t recv_value) /*override*/ {
221
222 MPITag_t tag = MPITag_t(status.MPI_TAG);
223 switch(tag) {
224
225 case WORKER_FINISHED_TAG: {
226
227 size_t job_id = recv_value;
228
229 size_t dummy = 1;
230 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, status.MPI_SOURCE,
231 MPI_WORKER_FINISHED_ACK_TAG, this->worker_comm_);
232
234 MPI_Recv_reqvars(res, status.MPI_SOURCE, this->worker_comm_);
235
236 running_job_list_.erase(job_id);
237 this->is_worker_idle_[status.MPI_SOURCE] = true;
238
239 std::ostringstream dump;
240 dump << "worker finished job with ID " << job_id << std::endl;
241 this->job_trace_->log(dump);
242
243
244 // sampler already terminated, cannot accept new messages
245 if (this->has_opt_converged_) return true;
246
247 int opt_master_rank = this->comm_->getLeader();
248 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, opt_master_rank,
250
251 MPI_Send_reqvars(res, opt_master_rank, this->opt_comm_);
252
253 // we keep worker busy _after_ results have been sent to sampler
254 if (!request_queue_.empty())
255 sendNewJobToWorker(status.MPI_SOURCE);
256
257 return true;
258 }
259
260 case OPT_NEW_JOB_TAG: {
261
262 size_t job_id = recv_value;
263 int opt_master_rank = this->comm_->getLeader();
264
265 Param_t job_params;
266 MPI_Recv_params(job_params, (size_t)opt_master_rank, this->opt_comm_);
267
268 request_queue_.insert(
269 std::pair<size_t, Param_t >(
270 job_id, job_params));
271
272 std::ostringstream dump;
273 dump << "new opt job with ID " << job_id << std::endl;
274 this->job_trace_->log(dump);
275
276 return true;
277 }
278
279 case OPT_CONVERGED_TAG: {
280 return this->stop();
281 }
282
284 this->is_worker_idle_[status.MPI_SOURCE] = true;
285 return true;
286 }
287
288 default: {
289 std::string msg = "(Pilot) Error: unexpected MPI_TAG: ";
290 msg += status.MPI_TAG;
291 throw OptPilotException("SamplePilot::onMessage", msg);
292 }
293 }
294 }
295
296private:
297 std::map< std::string,
298 std::shared_ptr<SamplingMethod>
300};
301
302#endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_OPT_JOB_FINISHED_TAG
pilot tells optimizer that results are ready to collect
Definition: MPIHelper.h:46
MPITag_t
Definition: MPIHelper.h:71
@ WORKER_FINISHED_TAG
Definition: MPIHelper.h:72
@ OPT_CONVERGED_TAG
Definition: MPIHelper.h:74
@ OPT_NEW_JOB_TAG
Definition: MPIHelper.h:73
@ WORKER_STATUSUPDATE_TAG
Definition: MPIHelper.h:75
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Send_params(Param_t params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:87
void MPI_Recv_reqvars(reqVarContainer_t &reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:165
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
std::map< std::string, DVar_t > DVarContainer_t
Definition: Types.h:92
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
std::map< std::string, client::function::type > functionDictionary_t
Definition: Expression.h:56
Representation objects and parsers for attribute expressions.
Definition: Expressions.h:64
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
Expression Expr_t
type of an expression
Definition: Expression.h:63
std::string::iterator iterator
Definition: MSLang.h:16
virtual void setup(functionDictionary_t known_expr_funcs, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:102
std::map< std::string, std::shared_ptr< SamplingMethod > > sampleMethods_m
Definition: SamplePilot.h:299
Jobs_t running_job_list_
Definition: SamplePilot.h:97
std::map< size_t, Param_t > Jobs_t
keep track of requests and running jobs
Definition: SamplePilot.h:95
virtual void postPoll()
executed after handling (if any) new request
Definition: SamplePilot.h:161
virtual ~SamplePilot()
Definition: SamplePilot.h:88
SamplePilot(CmdArguments_t args, boost::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs, const DVarContainer_t &dvar, const Expressions::Named_t &obj, const std::map< std::string, std::shared_ptr< SamplingMethod > > &sampleMethods, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:54
virtual void sendNewJobToWorker(int worker)
Definition: SamplePilot.h:194
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: SamplePilot.h:220
Jobs_t::iterator JobIter_t
Definition: SamplePilot.h:96
Jobs_t request_queue_
Definition: SamplePilot.h:98
virtual void startSampler()
Definition: SamplePilot.h:120
void startWorker(const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SamplePilot.h:136
Definition: Pilot.h:102
bool has_opt_converged_
Definition: Pilot.h:183
int total_available_workers_
Definition: Pilot.h:182
boost::shared_ptr< Comm_t > comm_
Definition: Pilot.h:166
std::string input_file_
input file for simulation with embedded optimization problem
Definition: Pilot.h:180
std::vector< bool > is_worker_idle_
Definition: Pilot.h:193
virtual void startPilot()
Definition: Pilot.h:311
int global_rank_
Definition: Pilot.h:169
bool stop(bool isOpt=true)
Definition: Pilot.h:573
int my_rank_in_worker_comm_
Definition: Pilot.h:170
Expressions::Named_t constraints_
constraints
Definition: Pilot.h:187
bool continue_polling_
Definition: Pilot.h:184
MPI_Comm opt_comm_
MPI communicator used for messages to/from optimizer.
Definition: Pilot.h:162
boost::scoped_ptr< Trace > job_trace_
Definition: Pilot.h:203
Expressions::Named_t objectives_
objectives
Definition: Pilot.h:186
MPI_Comm worker_comm_
MPI communicator used for messages to/from worker.
Definition: Pilot.h:160
DVarContainer_t dvars_
design variables
Definition: Pilot.h:188
void parseInputFile(functionDictionary_t, bool isOptimizationRun)
Definition: Pilot.h:238