OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
SampleWorker.h
Go to the documentation of this file.
1//
2// Class SampleWorker
3// A worker MPI entity consists of a processor group that runs a
4// simulation of type Sim_t. The main loop in run() accepts new jobs from the
5// master process runs the simulation and reports back the results.
6//
7// @see SamplePilot
8// @see Worker
9// @see MPIHelper.h
10//
11// @tparam Sim_T type of simulation to run
12//
13// Copyright (c) 2018, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
14// Yves Ineichen, ETH Zürich
15// All rights reserved
16//
17// Implemented as part of the PhD thesis
18// "Precise Simulations of Multibunches in High Intensity Cyclotrons"
19//
20// This file is part of OPAL.
21//
22// OPAL is free software: you can redistribute it and/or modify
23// it under the terms of the GNU General Public License as published by
24// the Free Software Foundation, either version 3 of the License, or
25// (at your option) any later version.
26//
27// You should have received a copy of the GNU General Public License
28// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
29//
30#ifndef __SAMPLE_WORKER_H__
31#define __SAMPLE_WORKER_H__
32
33#include "Pilot/Worker.h"
34
35template <class Sim_t>
36class SampleWorker : protected Worker<Sim_t> {
37
38public:
39
41 Expressions::Named_t constraints,
42 std::string simName,
43 Comm::Bundle_t comms,
44 CmdArguments_t args,
45 const std::vector<std::string> &storeobjstr,
46 const std::vector<std::string> &filesToKeep,
47 const std::map<std::string, std::string> &userVariables)
48 : Worker<Sim_t>(objectives, constraints, simName, comms, args, userVariables, false)
49 , statVariablesToStore_m(storeobjstr)
50 , filesToKeep_m(filesToKeep)
51 {
52
53 int my_local_pid = 0;
54 MPI_Comm_rank(this->coworker_comm_, &my_local_pid);
55
56 // distinction between leader and coworkers
57 if(my_local_pid == this->leader_pid_)
58 this->run();
59 else
60 runSlave();
61 }
62
64 {}
65
66protected:
67 // FIXME Open issue #250 (https://gitlab.psi.ch/OPAL/src/issues/250)
68 const std::vector<std::string> statVariablesToStore_m;
69
71 void notifyCoWorkers(size_t job_id, int tag) {
72
73 for(int i=0; i < this->num_coworkers_; i++) {
74 if(i == this->leader_pid_) continue;
75
76 // send job id to co workers
77 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, i, tag, this->coworker_comm_);
78 }
79 }
80
83 void runSlave() {
84 /* needs to be executed by derived class otherwise
85 * a base class instance is created.
86 */
87
88 MPI_Request stop_req;
89 size_t job_id = 0;
90
91 MPI_Irecv(&job_id, 1, MPI_UNSIGNED_LONG, this->leader_pid_,
92 MPI_ANY_TAG, this->coworker_comm_, &stop_req);
93 this->is_running_ = true;
94
95 while(this->is_running_) {
96
97 //FIXME: bcast blocks after our leader stopped working
98 // Either we create a new class implementing a coworker in the
99 // same manner as the worker (poll loop). Anyway there is no way
100 // around removing the Bcast and adding another tag in the poll
101 // loop above in order to be able to exit cleanly.
102 if(stop_req != MPI_REQUEST_NULL) {
103 MPI_Status status;
104 int flag = 0;
105 MPI_Test(&stop_req, &flag, &status);
106
107 if(flag) {
108
109 if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
110 Param_t params;
111 MPI_Bcast_params(params, this->leader_pid_, this->coworker_comm_);
112
113 try {
114 typename Worker<Sim_t>::SimPtr_t sim(
115 new Sim_t(this->objectives_, this->constraints_,
116 params, this->simulation_name_, this->coworker_comm_,
117 this->cmd_args_, this->userVariables_));
118
119 sim->setFilename(job_id);
120
121 sim->run();
122 } catch(OptPilotException &ex) {
123 std::cout << "Exception while running simulation: "
124 << ex.what() << std::endl;
125 }
126 MPI_Irecv(&job_id, 1, MPI_UNSIGNED_LONG, this->leader_pid_,
127 MPI_ANY_TAG, this->coworker_comm_, &stop_req);
128 }
129
130 if(status.MPI_TAG == MPI_STOP_TAG) {
131 this->is_running_ = false;
132 break;
133 }
134 }
135 }
136 }
137 }
138
139 bool onMessage(MPI_Status status, size_t recv_value) override {
140
141 if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
142
143 this->is_idle_ = false;
144 size_t job_id = recv_value;
145
146 // get new job
147 Param_t params;
148 MPI_Recv_params(params, (size_t)this->pilot_rank_, this->comm_m);
149
150 // and forward to coworkers (if any)
151 if(this->num_coworkers_ > 1) {
153 MPI_Bcast_params(params, this->leader_pid_, this->coworker_comm_);
154 }
155
156 reqVarContainer_t requested_results;
157 try {
158 typename Worker<Sim_t>::SimPtr_t sim(new Sim_t(this->objectives_,
159 this->constraints_,
160 params,
161 this->simulation_name_,
162 this->coworker_comm_,
163 this->cmd_args_,
164 this->userVariables_));
165
166 sim->setFilename(job_id);
167
168 // run simulation in a "blocking" fashion
169 sim->run();
170
171 // this is requests the columns from the stat file and stores them
172 // in a map with the column names as key and the columns as values; for #250
173 //
174 // std::map<std::string,
175 // std::vector<double> > data = sim->getData(statVariablesToStore_m);
176
177 sim->collectResults();
178 requested_results = sim->getResults();
179
180 // base clase of SamplePilot requires at least 1 objective --> dummy objective (SamplePilot, line 64)
181 if ( (this->objectives_.size() > 1) && filesToKeep_m.empty() ) {
182 sim->cleanUp();
183 } else {
184 // if empty, we keep all files
185 sim->cleanUp(filesToKeep_m);
186 }
187
188 } catch(OptPilotException &ex) {
189 std::cout << "Exception while running simulation: "
190 << ex.what() << std::endl;
191 }
192
193 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, this->pilot_rank_,
195
196 size_t dummy = 0;
197 MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, this->pilot_rank_,
198 MPI_WORKER_FINISHED_ACK_TAG, this->comm_m, &status);
199
200 MPI_Send_reqvars(requested_results, (size_t)this->pilot_rank_, this->comm_m);
201
202 this->is_idle_ = true;
203 return true;
204
205 } else {
206 std::stringstream os;
207 os << "Unexpected MPI_TAG: " << status.MPI_TAG;
208 std::cout << "(Worker) Error: " << os.str() << std::endl;
209 throw OptPilotException("SampleWorker::onMessage", os.str());
210 }
211 }
212
213private:
214 const std::vector<std::string> filesToKeep_m;
215};
216
217#endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:35
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:40
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:60
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
void runSlave()
Definition: SampleWorker.h:83
const std::vector< std::string > statVariablesToStore_m
Definition: SampleWorker.h:68
bool onMessage(MPI_Status status, size_t recv_value) override
Definition: SampleWorker.h:139
void notifyCoWorkers(size_t job_id, int tag)
notify coworkers of incoming broadcast
Definition: SampleWorker.h:71
const std::vector< std::string > filesToKeep_m
Definition: SampleWorker.h:214
SampleWorker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::vector< std::string > &storeobjstr, const std::vector< std::string > &filesToKeep, const std::map< std::string, std::string > &userVariables)
Definition: SampleWorker.h:40
bundles all communicators for a specific role/pid
Definition: types.h:32
bool is_running_
Definition: Poller.h:54
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
virtual void run()
Definition: Poller.h:79
Definition: Worker.h:45
Expressions::Named_t objectives_
Definition: Worker.h:108
std::string simulation_name_
Definition: Worker.h:170
int pilot_rank_
Definition: Worker.h:169
int num_coworkers_
Definition: Worker.h:168
MPI_Comm coworker_comm_
Definition: Worker.h:106
CmdArguments_t cmd_args_
Definition: Worker.h:171
int leader_pid_
Definition: Worker.h:167
const std::map< std::string, std::string > userVariables_
Definition: Worker.h:173
boost::scoped_ptr< Sim_t > SimPtr_t
Definition: Worker.h:103
Expressions::Named_t constraints_
Definition: Worker.h:109
bool is_idle_
Definition: Worker.h:105
virtual const char * what() const