OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
Worker.h
Go to the documentation of this file.
1 #ifndef __WORKER_H__
2 #define __WORKER_H__
3 
4 #include <iostream>
5 
6 #include "boost/smart_ptr.hpp"
7 
8 #include "Pilot/Poller.h"
9 
10 #include "Comm/types.h"
11 #include "Util/Types.h"
12 #include "Util/MPIHelper.h"
13 #include "Util/CmdArguments.h"
14 
27 template <class Sim_t>
28 class Worker : protected Poller {
29 
30 public:
31 
33  std::string simName,
34  Comm::Bundle_t comms,
35  CmdArguments_t args)
36  : Poller(comms.worker)
37  , cmd_args_(args)
38  {
39  constraints_ = constraints;
40  simulation_name_ = simName;
42  is_idle_ = true;
43  coworker_comm_ = comms.coworkers;
44 
45  leader_pid_ = 0;
46  MPI_Comm_size(coworker_comm_, &num_coworkers_);
47  }
48 
50  Expressions::Named_t constraints,
51  std::string simName,
52  Comm::Bundle_t comms,
53  CmdArguments_t args,
54  bool isOptimizer = true)
55  : Poller(comms.worker)
56  , cmd_args_(args)
57  {
58  objectives_ = objectives;
59  constraints_ = constraints;
60  simulation_name_ = simName;
62  is_idle_ = true;
63  coworker_comm_ = comms.coworkers;
64 
65  leader_pid_ = 0;
66  MPI_Comm_size(coworker_comm_, &num_coworkers_);
67 
68  if (!isOptimizer) return;
69  int my_local_pid = 0;
70  MPI_Comm_rank(coworker_comm_, &my_local_pid);
71 
72  // distinction between leader and coworkers
73  if(my_local_pid == leader_pid_)
74  run();
75  else
76  runCoWorker();
77  }
78 
80  {}
81 
82 
83 protected:
84  typedef boost::scoped_ptr<Sim_t> SimPtr_t;
85 
86  bool is_idle_;
87  MPI_Comm coworker_comm_;
88 
91 
92 
95  void runCoWorker() {
96 
97  MPI_Request stop_req;
98  size_t stop_value = 0;
99 
100  MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
101  MPI_ANY_TAG, coworker_comm_, &stop_req);
102  is_running_ = true;
103 
104  while(is_running_) {
105 
106  //FIXME: bcast blocks after our leader stopped working
107  // Either we create a new class implementing a coworker in the
108  // same manner as the worker (poll loop). Anyway there is no way
109  // around removing the Bcast and adding another tag in the poll
110  // loop above in order to be able to exit cleanly.
111  if(stop_req != MPI_REQUEST_NULL) {
112  MPI_Status status;
113  int flag = 0;
114  MPI_Test(&stop_req, &flag, &status);
115 
116  if(flag) {
117 
118  if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
119  Param_t params;
121 
122  try {
123  SimPtr_t sim(new Sim_t(objectives_, constraints_,
125  cmd_args_));
126 
127  sim->run();
128  } catch(OptPilotException &ex) {
129  std::cout << "Exception while running simulation: "
130  << ex.what() << std::endl;
131  }
132  MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
133  MPI_ANY_TAG, coworker_comm_, &stop_req);
134  }
135 
136  if(status.MPI_TAG == MPI_STOP_TAG) {
137  is_running_ = false;
138  break;
139  }
140  }
141  }
142  }
143  }
144 
145 
146 protected:
147 
151  std::string simulation_name_;
153 
155  void notifyCoWorkers(int tag) {
156 
157  for(int i=0; i < num_coworkers_; i++) {
158  if(i == leader_pid_) continue;
159 
160  size_t dummy = 0;
161  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, i, tag, coworker_comm_);
162  }
163  }
164 
165  void setupPoll() {
166  size_t dummy = 1;
167  MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
169  }
170 
171  void prePoll()
172  {}
173 
174  void postPoll()
175  {}
176 
177  void onStop() {
178  if(num_coworkers_ > 1)
180  }
181 
182  virtual bool onMessage(MPI_Status status, size_t recv_value) {
183 
184  if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
185 
186  is_idle_ = false;
187  size_t job_id = recv_value;
188 
189  // get new job
190  Param_t params;
191  MPI_Recv_params(params, (size_t)pilot_rank_, comm_m);
192 
193  // and forward to coworkers (if any)
194  if(num_coworkers_ > 1) {
197  }
198 
199  //XXX we need to know if we want EVAL or DERIVATIVE
200  //reqVarContainer_t reqVars;
201  //MPI_Recv_reqvars(reqVars, (size_t)pilot_rank_, comm_m);
202 
203  reqVarContainer_t requested_results;
204  try {
205  SimPtr_t sim(new Sim_t(objectives_, constraints_,
207 
208  // run simulation in a "blocking" fashion
209  sim->run();
210  sim->collectResults();
211  sim->cleanUp();
212  requested_results = sim->getResults();
213  } catch(OptPilotException &ex) {
214  std::cout << "Exception while running simulation: "
215  << ex.what() << std::endl;
216  }
217 
218  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, pilot_rank_,
220 
221  size_t dummy = 0;
222  MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
224 
225  MPI_Send_reqvars(requested_results, (size_t)pilot_rank_, comm_m);
226 
227  is_idle_ = true;
228  return true;
229 
230  } else {
231  std::stringstream os;
232  os << "Unexpected MPI_TAG: " << status.MPI_TAG;
233  std::cout << "(Worker) Error: " << os.str() << std::endl;
234  throw OptPilotException("Worker::onMessage", os.str());
235  }
236  }
237 };
238 
239 #endif
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:64
MPI_Comm coworker_comm_
Definition: Worker.h:87
virtual const char * what() const
void prePoll()
executed before checking for new request
Definition: Worker.h:171
#define MPI_WORKER_STATUSUPDATE_TAG
notify pilot about worker status
Definition: MPIHelper.h:13
CmdArguments_t cmd_args_
Definition: Worker.h:152
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:15
void notifyCoWorkers(int tag)
notify coworkers of incoming broadcast
Definition: Worker.h:155
int pilot_rank_
Definition: Worker.h:150
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:169
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:17
void postPoll()
executed after handling (if any) new request
Definition: Worker.h:174
A worker MPI entity consists of a processor group that runs a simulation of type Sim_t. The main loop in run() accepts new jobs from the master process runs the simulation and reports back the results.
Definition: Worker.h:28
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:125
int leader_pid_
Definition: Worker.h:148
bool is_idle_
Definition: Worker.h:86
namedVariableCollection_t Param_t
Definition: Types.h:33
boost::scoped_ptr< Sim_t > SimPtr_t
Definition: Worker.h:84
void onStop()
enable implementation to react to STOP tag
Definition: Worker.h:177
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: Worker.h:182
virtual void run()
Definition: Poller.h:62
Worker(Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args)
Definition: Worker.h:32
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:32
Worker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, bool isOptimizer=true)
Definition: Worker.h:49
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:107
int num_coworkers_
Definition: Worker.h:149
~Worker()
Definition: Worker.h:79
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:45
MPI_Comm coworkers
Definition: types.h:21
int master_local_pid
Definition: types.h:18
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:20
std::string simulation_name_
Definition: Worker.h:151
void runCoWorker()
Definition: Worker.h:95
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:39
Expressions::Named_t objectives_
Definition: Worker.h:89
bundles all communicators for a specific role/pid
Definition: types.h:14
An interface implementing the basics of a poll loop, posting an MPI_Irecv and waiting for new request...
Definition: Poller.h:18
void setupPoll()
executed before starting polling loop
Definition: Worker.h:165
bool is_running_
Definition: Poller.h:37
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:35
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:44
Expressions::Named_t constraints_
Definition: Worker.h:90