OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
Worker.h
Go to the documentation of this file.
1 //
2 // Class Worker
3 // A worker MPI entity consists of a processor group that runs a
4 // simulation of type Sim_t. The main loop in run() accepts new jobs from the
5 // master process runs the simulation and reports back the results.
6 //
7 // @see Pilot
8 // @see Poller
9 // @see MPIHelper.h
10 //
11 // @tparam Sim_T type of simulation to run
12 //
13 // Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
14 // All rights reserved
15 //
16 // Implemented as part of the PhD thesis
17 // "Toward massively parallel multi-objective optimization with application to
18 // particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
19 //
20 // This file is part of OPAL.
21 //
22 // OPAL is free software: you can redistribute it and/or modify
23 // it under the terms of the GNU General Public License as published by
24 // the Free Software Foundation, either version 3 of the License, or
25 // (at your option) any later version.
26 //
27 // You should have received a copy of the GNU General Public License
28 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
29 //
30 #ifndef __WORKER_H__
31 #define __WORKER_H__
32 
33 #include <iostream>
34 
35 #include "boost/smart_ptr.hpp"
36 
37 #include "Pilot/Poller.h"
38 
39 #include "Comm/types.h"
40 #include "Util/Types.h"
41 #include "Util/MPIHelper.h"
42 #include "Util/CmdArguments.h"
43 
44 template <class Sim_t>
45 class Worker : protected Poller {
46 
47 public:
48 
50  std::string simName,
51  Comm::Bundle_t comms,
52  CmdArguments_t args)
53  : Poller(comms.worker)
54  , cmd_args_(args)
55  {
56  constraints_ = constraints;
57  simulation_name_ = simName;
59  is_idle_ = true;
60  coworker_comm_ = comms.coworkers;
61 
62  leader_pid_ = 0;
63  MPI_Comm_size(coworker_comm_, &num_coworkers_);
64  }
65 
67  Expressions::Named_t constraints,
68  std::string simName,
69  Comm::Bundle_t comms,
70  CmdArguments_t args,
71  const std::map<std::string, std::string> &userVariables,
72  bool isOptimizer = true)
73  : Poller(comms.worker)
74  , cmd_args_(args)
75  , userVariables_(userVariables)
76  {
77  objectives_ = objectives;
78  constraints_ = constraints;
79  simulation_name_ = simName;
81  is_idle_ = true;
82  coworker_comm_ = comms.coworkers;
83 
84  leader_pid_ = 0;
85  MPI_Comm_size(coworker_comm_, &num_coworkers_);
86 
87  if (!isOptimizer) return;
88  int my_local_pid = 0;
89  MPI_Comm_rank(coworker_comm_, &my_local_pid);
90 
91  // distinction between leader and coworkers
92  if(my_local_pid == leader_pid_)
93  run();
94  else
95  runCoWorker();
96  }
97 
99  {}
100 
101 
102 protected:
103  typedef boost::scoped_ptr<Sim_t> SimPtr_t;
104 
105  bool is_idle_;
106  MPI_Comm coworker_comm_;
107 
110 
111 
114  void runCoWorker() {
115 
116  MPI_Request stop_req;
117  size_t stop_value = 0;
118 
119  MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
120  MPI_ANY_TAG, coworker_comm_, &stop_req);
121  is_running_ = true;
122 
123  while(is_running_) {
124 
125  //FIXME: bcast blocks after our leader stopped working
126  // Either we create a new class implementing a coworker in the
127  // same manner as the worker (poll loop). Anyway there is no way
128  // around removing the Bcast and adding another tag in the poll
129  // loop above in order to be able to exit cleanly.
130  if(stop_req != MPI_REQUEST_NULL) {
131  MPI_Status status;
132  int flag = 0;
133  MPI_Test(&stop_req, &flag, &status);
134 
135  if(flag) {
136 
137  if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
138  Param_t params;
140 
141  try {
142  SimPtr_t sim(new Sim_t(objectives_, constraints_,
145 
146  sim->run();
147  } catch(OptPilotException &ex) {
148  std::cout << "Exception while running simulation: "
149  << ex.what() << std::endl;
150  }
151  MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
152  MPI_ANY_TAG, coworker_comm_, &stop_req);
153  }
154 
155  if(status.MPI_TAG == MPI_STOP_TAG) {
156  is_running_ = false;
157  break;
158  }
159  }
160  }
161  }
162  }
163 
164 
165 protected:
166 
170  std::string simulation_name_;
172 
173  const std::map<std::string, std::string> userVariables_;
174 
176  void notifyCoWorkers(int tag) {
177 
178  for(int i=0; i < num_coworkers_; i++) {
179  if(i == leader_pid_) continue;
180 
181  size_t dummy = 0;
182  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, i, tag, coworker_comm_);
183  }
184  }
185 
186  void setupPoll() {
187  size_t dummy = 1;
188  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
190  }
191 
192  void prePoll()
193  {}
194 
195  void postPoll()
196  {}
197 
198  void onStop() {
199  if(num_coworkers_ > 1)
201  }
202 
203  virtual bool onMessage(MPI_Status status, size_t recv_value) {
204 
205  if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
206 
207  is_idle_ = false;
208  size_t job_id = recv_value;
209 
210  // get new job
211  Param_t params;
212  MPI_Recv_params(params, (size_t)pilot_rank_, comm_m);
213 
214  // and forward to coworkers (if any)
215  if(num_coworkers_ > 1) {
218  }
219 
220  //XXX we need to know if we want EVAL or DERIVATIVE
221  //reqVarContainer_t reqVars;
222  //MPI_Recv_reqvars(reqVars, (size_t)pilot_rank_, comm_m);
223 
224  reqVarContainer_t requested_results;
225  try {
226  SimPtr_t sim(new Sim_t(objectives_, constraints_,
228  userVariables_));
229 
230  // run simulation in a "blocking" fashion
231  sim->run();
232  sim->collectResults();
233  sim->cleanUp();
234  requested_results = sim->getResults();
235  } catch(OptPilotException &ex) {
236  std::cout << "Exception while running simulation: "
237  << ex.what() << std::endl;
238  }
239 
240  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, pilot_rank_,
242 
243  size_t dummy = 0;
244  MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
246 
247  MPI_Send_reqvars(requested_results, (size_t)pilot_rank_, comm_m);
248 
249  is_idle_ = true;
250  return true;
251 
252  } else {
253  std::stringstream os;
254  os << "Unexpected MPI_TAG: " << status.MPI_TAG;
255  std::cout << "(Worker) Error: " << os.str() << std::endl;
256  throw OptPilotException("Worker::onMessage", os.str());
257  }
258  }
259 };
260 
261 #endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:60
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:35
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:40
#define MPI_WORKER_STATUSUPDATE_TAG
notify pilot about worker status
Definition: MPIHelper.h:33
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
bundles all communicators for a specific role/pid
Definition: types.h:32
MPI_Comm coworkers
Definition: types.h:39
int master_local_pid
Definition: types.h:36
Definition: Poller.h:35
bool is_running_
Definition: Poller.h:54
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
virtual void run()
Definition: Poller.h:79
Definition: Worker.h:45
Expressions::Named_t objectives_
Definition: Worker.h:108
~Worker()
Definition: Worker.h:98
Worker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::map< std::string, std::string > &userVariables, bool isOptimizer=true)
Definition: Worker.h:66
std::string simulation_name_
Definition: Worker.h:170
void runCoWorker()
Definition: Worker.h:114
int pilot_rank_
Definition: Worker.h:169
int num_coworkers_
Definition: Worker.h:168
void postPoll()
executed after handling (if any) new request
Definition: Worker.h:195
void onStop()
enable implementation to react to STOP tag
Definition: Worker.h:198
MPI_Comm coworker_comm_
Definition: Worker.h:106
void notifyCoWorkers(int tag)
notify coworkers of incoming broadcast
Definition: Worker.h:176
void prePoll()
executed before checking for new request
Definition: Worker.h:192
CmdArguments_t cmd_args_
Definition: Worker.h:171
int leader_pid_
Definition: Worker.h:167
const std::map< std::string, std::string > userVariables_
Definition: Worker.h:173
void setupPoll()
executed before starting polling loop
Definition: Worker.h:186
Worker(Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args)
Definition: Worker.h:49
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: Worker.h:203
boost::scoped_ptr< Sim_t > SimPtr_t
Definition: Worker.h:103
Expressions::Named_t constraints_
Definition: Worker.h:109
bool is_idle_
Definition: Worker.h:105
virtual const char * what() const