OPAL (Object Oriented Parallel Accelerator Library)  2.2.0
OPAL
SampleWorker.h
Go to the documentation of this file.
1 #ifndef __SAMPLE_WORKER_H__
2 #define __SAMPLE_WORKER_H__
3 
4 #include "Pilot/Worker.h"
5 
6 
19 template <class Sim_t>
20 class SampleWorker : protected Worker<Sim_t> {
21 
22 public:
23 
25  Expressions::Named_t constraints,
26  std::string simName,
27  Comm::Bundle_t comms,
28  CmdArguments_t args,
29  const std::vector<std::string> &storeobjstr,
30  const std::vector<std::string> &filesToKeep)
31  : Worker<Sim_t>(objectives, constraints, simName, comms, args, false)
32  , statVariablesToStore_m(storeobjstr)
33  , filesToKeep_m(filesToKeep)
34  {
35 
36  int my_local_pid = 0;
37  MPI_Comm_rank(this->coworker_comm_, &my_local_pid);
38 
39  // distinction between leader and coworkers
40  if(my_local_pid == this->leader_pid_)
41  this->run();
42  else
43  runSlave();
44  }
45 
47  {}
48 
49 protected:
50  // FIXME Open issue #250 (https://gitlab.psi.ch/OPAL/src/issues/250)
51  const std::vector<std::string> statVariablesToStore_m;
52 
54  void notifyCoWorkers(size_t job_id, int tag) {
55 
56  for(int i=0; i < this->num_coworkers_; i++) {
57  if(i == this->leader_pid_) continue;
58 
59  // send job id to co workers
60  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, i, tag, this->coworker_comm_);
61  }
62  }
63 
66  void runSlave() {
67  /* needs to be executed by derived class otherwise
68  * a base class instance is created.
69  */
70 
71  MPI_Request stop_req;
72  size_t job_id = 0;
73 
74  MPI_Irecv(&job_id, 1, MPI_UNSIGNED_LONG, this->leader_pid_,
75  MPI_ANY_TAG, this->coworker_comm_, &stop_req);
76  this->is_running_ = true;
77 
78  while(this->is_running_) {
79 
80  //FIXME: bcast blocks after our leader stopped working
81  // Either we create a new class implementing a coworker in the
82  // same manner as the worker (poll loop). Anyway there is no way
83  // around removing the Bcast and adding another tag in the poll
84  // loop above in order to be able to exit cleanly.
85  if(stop_req != MPI_REQUEST_NULL) {
86  MPI_Status status;
87  int flag = 0;
88  MPI_Test(&stop_req, &flag, &status);
89 
90  if(flag) {
91 
92  if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
93  Param_t params;
94  MPI_Bcast_params(params, this->leader_pid_, this->coworker_comm_);
95 
96  try {
97  typename Worker<Sim_t>::SimPtr_t sim(new Sim_t(this->objectives_, this->constraints_,
98  params, this->simulation_name_, this->coworker_comm_,
99  this->cmd_args_));
100 
101  sim->setFilename(job_id);
102 
103  sim->run();
104  } catch(OptPilotException &ex) {
105  std::cout << "Exception while running simulation: "
106  << ex.what() << std::endl;
107  }
108  MPI_Irecv(&job_id, 1, MPI_UNSIGNED_LONG, this->leader_pid_,
109  MPI_ANY_TAG, this->coworker_comm_, &stop_req);
110  }
111 
112  if(status.MPI_TAG == MPI_STOP_TAG) {
113  this->is_running_ = false;
114  break;
115  }
116  }
117  }
118  }
119  }
120 
121  bool onMessage(MPI_Status status, size_t recv_value) override {
122 
123  if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
124 
125  this->is_idle_ = false;
126  size_t job_id = recv_value;
127 
128  // get new job
129  Param_t params;
130  MPI_Recv_params(params, (size_t)this->pilot_rank_, this->comm_m);
131 
132  // and forward to coworkers (if any)
133  if(this->num_coworkers_ > 1) {
135  MPI_Bcast_params(params, this->leader_pid_, this->coworker_comm_);
136  }
137 
138  reqVarContainer_t requested_results;
139  try {
140  typename Worker<Sim_t>::SimPtr_t sim(new Sim_t(this->objectives_,
141  this->constraints_,
142  params,
143  this->simulation_name_,
144  this->coworker_comm_,
145  this->cmd_args_));
146 
147  sim->setFilename(job_id);
148 
149  // run simulation in a "blocking" fashion
150  sim->run();
151 
152  // this is requests the columns from the stat file and stores them
153  // in a map with the column names as key and the columns as values; for #250
154  //
155  // std::map<std::string,
156  // std::vector<double> > data = sim->getData(statVariablesToStore_m);
157 
158  sim->collectResults();
159  requested_results = sim->getResults();
160 
161  // base clase of SamplePilot requires at least 1 objective --> dummy objective (SamplePilot, line 64)
162  if ( (this->objectives_.size() > 1) && filesToKeep_m.empty() ) {
163  sim->cleanUp();
164  } else {
165  // if empty, we keep all files
166  sim->cleanUp(filesToKeep_m);
167  }
168 
169  } catch(OptPilotException &ex) {
170  std::cout << "Exception while running simulation: "
171  << ex.what() << std::endl;
172  }
173 
174  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, this->pilot_rank_,
176 
177  size_t dummy = 0;
178  MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, this->pilot_rank_,
179  MPI_WORKER_FINISHED_ACK_TAG, this->comm_m, &status);
180 
181  MPI_Send_reqvars(requested_results, (size_t)this->pilot_rank_, this->comm_m);
182 
183  this->is_idle_ = true;
184  return true;
185 
186  } else {
187  std::stringstream os;
188  os << "Unexpected MPI_TAG: " << status.MPI_TAG;
189  std::cout << "(Worker) Error: " << os.str() << std::endl;
190  throw OptPilotException("SampleWorker::onMessage", os.str());
191  }
192  }
193 
194 private:
195  const std::vector<std::string> filesToKeep_m;
196 };
197 
198 #endif
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:64
MPI_Comm coworker_comm_
Definition: Worker.h:87
virtual const char * what() const
void runSlave()
Definition: SampleWorker.h:66
CmdArguments_t cmd_args_
Definition: Worker.h:152
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:15
int pilot_rank_
Definition: Worker.h:150
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:169
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:17
const std::vector< std::string > statVariablesToStore_m
Definition: SampleWorker.h:51
A worker MPI entity consists of a processor group that runs a simulation of type Sim_t. The main loop in run() accepts new jobs from the master process runs the simulation and reports back the results.
Definition: Worker.h:28
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:125
int leader_pid_
Definition: Worker.h:148
bool is_idle_
Definition: Worker.h:86
namedVariableCollection_t Param_t
Definition: Types.h:33
SampleWorker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep)
Definition: SampleWorker.h:24
boost::scoped_ptr< Sim_t > SimPtr_t
Definition: Worker.h:84
A worker MPI entity consists of a processor group that runs a simulation of type Sim_t. The main loop in run() accepts new jobs from the master process runs the simulation and reports back the results.
Definition: SampleWorker.h:20
virtual void run()
Definition: Poller.h:62
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:32
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:107
int num_coworkers_
Definition: Worker.h:149
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:45
const std::vector< std::string > filesToKeep_m
Definition: SampleWorker.h:195
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:20
std::string simulation_name_
Definition: Worker.h:151
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:39
Expressions::Named_t objectives_
Definition: Worker.h:89
bundles all communicators for a specific role/pid
Definition: types.h:14
bool onMessage(MPI_Status status, size_t recv_value) override
Definition: SampleWorker.h:121
void notifyCoWorkers(size_t job_id, int tag)
notify coworkers of incoming broadcast
Definition: SampleWorker.h:54
bool is_running_
Definition: Poller.h:37
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:35
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:44
Expressions::Named_t constraints_
Definition: Worker.h:90