OPAL (Object Oriented Parallel Accelerator Library)  2024.1
OPAL
Worker.h
Go to the documentation of this file.
1 //
2 // Class Worker
3 // A worker MPI entity consists of a processor group that runs a
4 // simulation of type Sim_t. The main loop in run() accepts new jobs from the
5 // master process runs the simulation and reports back the results.
6 //
7 // @see Pilot
8 // @see Poller
9 // @see MPIHelper.h
10 //
11 // @tparam Sim_T type of simulation to run
12 //
13 // Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
14 // All rights reserved
15 //
16 // Implemented as part of the PhD thesis
17 // "Toward massively parallel multi-objective optimization with application to
18 // particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
19 //
20 // This file is part of OPAL.
21 //
22 // OPAL is free software: you can redistribute it and/or modify
23 // it under the terms of the GNU General Public License as published by
24 // the Free Software Foundation, either version 3 of the License, or
25 // (at your option) any later version.
26 //
27 // You should have received a copy of the GNU General Public License
28 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
29 //
30 #ifndef __WORKER_H__
31 #define __WORKER_H__
32 
33 #include <iostream>
34 
35 #include "Pilot/Poller.h"
36 
37 #include "Comm/types.h"
38 #include "Util/Types.h"
39 #include "Util/MPIHelper.h"
40 #include "Util/CmdArguments.h"
41 
42 template <class Sim_t>
43 class Worker : protected Poller {
44 
45 public:
46 
48  std::string simName,
49  Comm::Bundle_t comms,
50  CmdArguments_t args)
51  : Poller(comms.worker)
52  , cmd_args_(args)
53  {
54  constraints_ = constraints;
55  simulation_name_ = simName;
57  is_idle_ = true;
58  coworker_comm_ = comms.coworkers;
59 
60  leader_pid_ = 0;
61  MPI_Comm_size(coworker_comm_, &num_coworkers_);
62  }
63 
65  Expressions::Named_t constraints,
66  std::string simName,
67  Comm::Bundle_t comms,
68  CmdArguments_t args,
69  const std::map<std::string, std::string> &userVariables,
70  bool isOptimizer = true)
71  : Poller(comms.worker)
72  , cmd_args_(args)
73  , userVariables_(userVariables)
74  {
75  objectives_ = objectives;
76  constraints_ = constraints;
77  simulation_name_ = simName;
79  is_idle_ = true;
80  coworker_comm_ = comms.coworkers;
81 
82  leader_pid_ = 0;
83  MPI_Comm_size(coworker_comm_, &num_coworkers_);
84 
85  if (!isOptimizer) return;
86  int my_local_pid = 0;
87  MPI_Comm_rank(coworker_comm_, &my_local_pid);
88 
89  // distinction between leader and coworkers
90  if(my_local_pid == leader_pid_)
91  run();
92  else
93  runCoWorker();
94  }
95 
97  {}
98 
99 
100 protected:
101  typedef const std::unique_ptr<Sim_t> SimPtr_t;
102 
103  bool is_idle_;
104  MPI_Comm coworker_comm_;
105 
108 
109 
112  void runCoWorker() {
113 
114  MPI_Request stop_req;
115  size_t stop_value = 0;
116 
117  MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
118  MPI_ANY_TAG, coworker_comm_, &stop_req);
119  is_running_ = true;
120 
121  while(is_running_) {
122 
123  //FIXME: bcast blocks after our leader stopped working
124  // Either we create a new class implementing a coworker in the
125  // same manner as the worker (poll loop). Anyway there is no way
126  // around removing the Bcast and adding another tag in the poll
127  // loop above in order to be able to exit cleanly.
128  if(stop_req != MPI_REQUEST_NULL) {
129  MPI_Status status;
130  int flag = 0;
131  MPI_Test(&stop_req, &flag, &status);
132 
133  if(flag) {
134 
135  if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
136  Param_t params;
138 
139  try {
140  SimPtr_t sim(new Sim_t(objectives_, constraints_,
143 
144  sim->run();
145  } catch(OptPilotException &ex) {
146  std::cout << "Exception while running simulation: "
147  << ex.what() << std::endl;
148  }
149  MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
150  MPI_ANY_TAG, coworker_comm_, &stop_req);
151  }
152 
153  if(status.MPI_TAG == MPI_STOP_TAG) {
154  is_running_ = false;
155  break;
156  }
157  }
158  }
159  }
160  }
161 
162 
163 protected:
164 
168  std::string simulation_name_;
170 
171  const std::map<std::string, std::string> userVariables_;
172 
174  void notifyCoWorkers(int tag) {
175 
176  for(int i=0; i < num_coworkers_; i++) {
177  if(i == leader_pid_) continue;
178 
179  size_t dummy = 0;
180  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, i, tag, coworker_comm_);
181  }
182  }
183 
184  void setupPoll() {
185  size_t dummy = 1;
186  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
188  }
189 
190  void prePoll()
191  {}
192 
193  void postPoll()
194  {}
195 
196  void onStop() {
197  if(num_coworkers_ > 1)
199  }
200 
201  virtual bool onMessage(MPI_Status status, size_t recv_value) {
202 
203  if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
204 
205  is_idle_ = false;
206  size_t job_id = recv_value;
207 
208  // get new job
209  Param_t params;
210  MPI_Recv_params(params, (size_t)pilot_rank_, comm_m);
211 
212  // and forward to coworkers (if any)
213  if(num_coworkers_ > 1) {
216  }
217 
218  //XXX we need to know if we want EVAL or DERIVATIVE
219  //reqVarContainer_t reqVars;
220  //MPI_Recv_reqvars(reqVars, (size_t)pilot_rank_, comm_m);
221 
222  reqVarContainer_t requested_results;
223  try {
224  SimPtr_t sim(new Sim_t(objectives_, constraints_,
226  userVariables_));
227 
228  // run simulation in a "blocking" fashion
229  sim->run();
230  sim->collectResults();
231  sim->cleanUp();
232  requested_results = sim->getResults();
233  } catch(OptPilotException &ex) {
234  std::cout << "Exception while running simulation: "
235  << ex.what() << std::endl;
236  }
237 
238  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, pilot_rank_,
240 
241  size_t dummy = 0;
242  MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
244 
245  MPI_Send_reqvars(requested_results, (size_t)pilot_rank_, comm_m);
246 
247  is_idle_ = true;
248  return true;
249 
250  } else {
251  std::stringstream os;
252  os << "Unexpected MPI_TAG: " << status.MPI_TAG;
253  std::cout << "(Worker) Error: " << os.str() << std::endl;
254  throw OptPilotException("Worker::onMessage", os.str());
255  }
256  }
257 };
258 
259 #endif
namedVariableCollection_t Param_t
Definition: Types.h:48
#define MPI_WORKER_STATUSUPDATE_TAG
notify pilot about worker status
Definition: MPIHelper.h:33
int pilot_rank_
Definition: Worker.h:167
bool is_running_
Definition: Poller.h:54
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:35
void postPoll()
executed after handling (if any) new request
Definition: Worker.h:193
virtual const char * what() const
void setupPoll()
executed before starting polling loop
Definition: Worker.h:184
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
int num_coworkers_
Definition: Worker.h:166
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
Worker(Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args)
Definition: Worker.h:47
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:60
Definition: Poller.h:35
bool is_idle_
Definition: Worker.h:103
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
void notifyCoWorkers(int tag)
notify coworkers of incoming broadcast
Definition: Worker.h:174
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
void prePoll()
executed before checking for new request
Definition: Worker.h:190
MPI_Comm coworkers
Definition: types.h:39
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
int master_local_pid
Definition: types.h:36
std::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
~Worker()
Definition: Worker.h:96
const std::map< std::string, std::string > userVariables_
Definition: Worker.h:171
void runCoWorker()
Definition: Worker.h:112
Expressions::Named_t constraints_
Definition: Worker.h:107
int leader_pid_
Definition: Worker.h:165
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: Worker.h:201
std::string simulation_name_
Definition: Worker.h:168
Definition: Worker.h:43
Worker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::map< std::string, std::string > &userVariables, bool isOptimizer=true)
Definition: Worker.h:64
bundles all communicators for a specific role/pid
Definition: types.h:32
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:40
const std::unique_ptr< Sim_t > SimPtr_t
Definition: Worker.h:101
MPI_Comm coworker_comm_
Definition: Worker.h:104
Expressions::Named_t objectives_
Definition: Worker.h:106
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
CmdArguments_t cmd_args_
Definition: Worker.h:169
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
void onStop()
enable implementation to react to STOP tag
Definition: Worker.h:196
virtual void run()
Definition: Poller.h:79