OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
SampleWorker.h
Go to the documentation of this file.
1 //
2 // Class SampleWorker
3 // A worker MPI entity consists of a processor group that runs a
4 // simulation of type Sim_t. The main loop in run() accepts new jobs from the
5 // master process runs the simulation and reports back the results.
6 //
7 // @see SamplePilot
8 // @see Worker
9 // @see MPIHelper.h
10 //
11 // @tparam Sim_T type of simulation to run
12 //
13 // Copyright (c) 2018, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
14 // Yves Ineichen, ETH Zürich
15 // All rights reserved
16 //
17 // Implemented as part of the PhD thesis
18 // "Precise Simulations of Multibunches in High Intensity Cyclotrons"
19 //
20 // This file is part of OPAL.
21 //
22 // OPAL is free software: you can redistribute it and/or modify
23 // it under the terms of the GNU General Public License as published by
24 // the Free Software Foundation, either version 3 of the License, or
25 // (at your option) any later version.
26 //
27 // You should have received a copy of the GNU General Public License
28 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
29 //
30 #ifndef __SAMPLE_WORKER_H__
31 #define __SAMPLE_WORKER_H__
32 
33 #include "Pilot/Worker.h"
34 
35 template <class Sim_t>
36 class SampleWorker : protected Worker<Sim_t> {
37 
38 public:
39 
41  Expressions::Named_t constraints,
42  std::string simName,
43  Comm::Bundle_t comms,
44  CmdArguments_t args,
45  const std::vector<std::string> &storeobjstr,
46  const std::vector<std::string> &filesToKeep,
47  const std::map<std::string, std::string> &userVariables)
48  : Worker<Sim_t>(objectives, constraints, simName, comms, args, userVariables, false)
49  , statVariablesToStore_m(storeobjstr)
50  , filesToKeep_m(filesToKeep)
51  {
52 
53  int my_local_pid = 0;
54  MPI_Comm_rank(this->coworker_comm_, &my_local_pid);
55 
56  // distinction between leader and coworkers
57  if(my_local_pid == this->leader_pid_)
58  this->run();
59  else
60  runSlave();
61  }
62 
64  {}
65 
66 protected:
67  // FIXME Open issue #250 (https://gitlab.psi.ch/OPAL/src/issues/250)
68  const std::vector<std::string> statVariablesToStore_m;
69 
71  void notifyCoWorkers(size_t job_id, int tag) {
72 
73  for(int i=0; i < this->num_coworkers_; i++) {
74  if(i == this->leader_pid_) continue;
75 
76  // send job id to co workers
77  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, i, tag, this->coworker_comm_);
78  }
79  }
80 
83  void runSlave() {
84  /* needs to be executed by derived class otherwise
85  * a base class instance is created.
86  */
87 
88  MPI_Request stop_req;
89  size_t job_id = 0;
90 
91  MPI_Irecv(&job_id, 1, MPI_UNSIGNED_LONG, this->leader_pid_,
92  MPI_ANY_TAG, this->coworker_comm_, &stop_req);
93  this->is_running_ = true;
94 
95  while(this->is_running_) {
96 
97  //FIXME: bcast blocks after our leader stopped working
98  // Either we create a new class implementing a coworker in the
99  // same manner as the worker (poll loop). Anyway there is no way
100  // around removing the Bcast and adding another tag in the poll
101  // loop above in order to be able to exit cleanly.
102  if(stop_req != MPI_REQUEST_NULL) {
103  MPI_Status status;
104  int flag = 0;
105  MPI_Test(&stop_req, &flag, &status);
106 
107  if(flag) {
108 
109  if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
110  Param_t params;
111  MPI_Bcast_params(params, this->leader_pid_, this->coworker_comm_);
112 
113  try {
114  typename Worker<Sim_t>::SimPtr_t sim(
115  new Sim_t(this->objectives_, this->constraints_,
116  params, this->simulation_name_, this->coworker_comm_,
117  this->cmd_args_, this->userVariables_));
118 
119  sim->setFilename(job_id);
120 
121  sim->run();
122  } catch(OptPilotException &ex) {
123  std::cout << "Exception while running simulation: "
124  << ex.what() << std::endl;
125  }
126  MPI_Irecv(&job_id, 1, MPI_UNSIGNED_LONG, this->leader_pid_,
127  MPI_ANY_TAG, this->coworker_comm_, &stop_req);
128  }
129 
130  if(status.MPI_TAG == MPI_STOP_TAG) {
131  this->is_running_ = false;
132  break;
133  }
134  }
135  }
136  }
137  }
138 
139  bool onMessage(MPI_Status status, size_t recv_value) override {
140 
141  if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
142 
143  this->is_idle_ = false;
144  size_t job_id = recv_value;
145 
146  // get new job
147  Param_t params;
148  MPI_Recv_params(params, (size_t)this->pilot_rank_, this->comm_m);
149 
150  // and forward to coworkers (if any)
151  if(this->num_coworkers_ > 1) {
153  MPI_Bcast_params(params, this->leader_pid_, this->coworker_comm_);
154  }
155 
156  reqVarContainer_t requested_results;
157  try {
158  typename Worker<Sim_t>::SimPtr_t sim(new Sim_t(this->objectives_,
159  this->constraints_,
160  params,
161  this->simulation_name_,
162  this->coworker_comm_,
163  this->cmd_args_,
164  this->userVariables_));
165 
166  sim->setFilename(job_id);
167 
168  // run simulation in a "blocking" fashion
169  sim->run();
170 
171  // this is requests the columns from the stat file and stores them
172  // in a map with the column names as key and the columns as values; for #250
173  //
174  // std::map<std::string,
175  // std::vector<double> > data = sim->getData(statVariablesToStore_m);
176 
177  sim->collectResults();
178  requested_results = sim->getResults();
179 
180  // base clase of SamplePilot requires at least 1 objective --> dummy objective (SamplePilot, line 64)
181  if ( (this->objectives_.size() > 1) && filesToKeep_m.empty() ) {
182  sim->cleanUp();
183  } else {
184  // if empty, we keep all files
185  sim->cleanUp(filesToKeep_m);
186  }
187 
188  } catch(OptPilotException &ex) {
189  std::cout << "Exception while running simulation: "
190  << ex.what() << std::endl;
191  }
192 
193  MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, this->pilot_rank_,
195 
196  size_t dummy = 0;
197  MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, this->pilot_rank_,
198  MPI_WORKER_FINISHED_ACK_TAG, this->comm_m, &status);
199 
200  MPI_Send_reqvars(requested_results, (size_t)this->pilot_rank_, this->comm_m);
201 
202  this->is_idle_ = true;
203  return true;
204 
205  } else {
206  std::stringstream os;
207  os << "Unexpected MPI_TAG: " << status.MPI_TAG;
208  std::cout << "(Worker) Error: " << os.str() << std::endl;
209  throw OptPilotException("SampleWorker::onMessage", os.str());
210  }
211  }
212 
213 private:
214  const std::vector<std::string> filesToKeep_m;
215 };
216 
217 #endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:60
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:35
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:40
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
void runSlave()
Definition: SampleWorker.h:83
const std::vector< std::string > statVariablesToStore_m
Definition: SampleWorker.h:68
bool onMessage(MPI_Status status, size_t recv_value) override
Definition: SampleWorker.h:139
void notifyCoWorkers(size_t job_id, int tag)
notify coworkers of incoming broadcast
Definition: SampleWorker.h:71
const std::vector< std::string > filesToKeep_m
Definition: SampleWorker.h:214
SampleWorker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SampleWorker.h:40
bundles all communicators for a specific role/pid
Definition: types.h:32
bool is_running_
Definition: Poller.h:54
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
virtual void run()
Definition: Poller.h:79
Definition: Worker.h:45
Expressions::Named_t objectives_
Definition: Worker.h:108
std::string simulation_name_
Definition: Worker.h:170
int pilot_rank_
Definition: Worker.h:169
int num_coworkers_
Definition: Worker.h:168
MPI_Comm coworker_comm_
Definition: Worker.h:106
CmdArguments_t cmd_args_
Definition: Worker.h:171
int leader_pid_
Definition: Worker.h:167
const std::map< std::string, std::string > userVariables_
Definition: Worker.h:173
boost::scoped_ptr< Sim_t > SimPtr_t
Definition: Worker.h:103
Expressions::Named_t constraints_
Definition: Worker.h:109
bool is_idle_
Definition: Worker.h:105
virtual const char * what() const