OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
Worker.h
Go to the documentation of this file.
1//
2// Class Worker
3// A worker MPI entity consists of a processor group that runs a
4// simulation of type Sim_t. The main loop in run() accepts new jobs from the
5// master process runs the simulation and reports back the results.
6//
7// @see Pilot
8// @see Poller
9// @see MPIHelper.h
10//
11// @tparam Sim_T type of simulation to run
12//
13// Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
14// All rights reserved
15//
16// Implemented as part of the PhD thesis
17// "Toward massively parallel multi-objective optimization with application to
18// particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
19//
20// This file is part of OPAL.
21//
22// OPAL is free software: you can redistribute it and/or modify
23// it under the terms of the GNU General Public License as published by
24// the Free Software Foundation, either version 3 of the License, or
25// (at your option) any later version.
26//
27// You should have received a copy of the GNU General Public License
28// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
29//
30#ifndef __WORKER_H__
31#define __WORKER_H__
32
33#include <iostream>
34
35#include "boost/smart_ptr.hpp"
36
37#include "Pilot/Poller.h"
38
39#include "Comm/types.h"
40#include "Util/Types.h"
41#include "Util/MPIHelper.h"
42#include "Util/CmdArguments.h"
43
44template <class Sim_t>
45class Worker : protected Poller {
46
47public:
48
50 std::string simName,
51 Comm::Bundle_t comms,
52 CmdArguments_t args)
53 : Poller(comms.worker)
54 , cmd_args_(args)
55 {
56 constraints_ = constraints;
57 simulation_name_ = simName;
59 is_idle_ = true;
61
62 leader_pid_ = 0;
63 MPI_Comm_size(coworker_comm_, &num_coworkers_);
64 }
65
67 Expressions::Named_t constraints,
68 std::string simName,
69 Comm::Bundle_t comms,
70 CmdArguments_t args,
71 const std::map<std::string, std::string> &userVariables,
72 bool isOptimizer = true)
73 : Poller(comms.worker)
74 , cmd_args_(args)
75 , userVariables_(userVariables)
76 {
77 objectives_ = objectives;
78 constraints_ = constraints;
79 simulation_name_ = simName;
81 is_idle_ = true;
83
84 leader_pid_ = 0;
85 MPI_Comm_size(coworker_comm_, &num_coworkers_);
86
87 if (!isOptimizer) return;
88 int my_local_pid = 0;
89 MPI_Comm_rank(coworker_comm_, &my_local_pid);
90
91 // distinction between leader and coworkers
92 if(my_local_pid == leader_pid_)
93 run();
94 else
96 }
97
99 {}
100
101
102protected:
103 typedef boost::scoped_ptr<Sim_t> SimPtr_t;
104
107
110
111
114 void runCoWorker() {
115
116 MPI_Request stop_req;
117 size_t stop_value = 0;
118
119 MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
120 MPI_ANY_TAG, coworker_comm_, &stop_req);
121 is_running_ = true;
122
123 while(is_running_) {
124
125 //FIXME: bcast blocks after our leader stopped working
126 // Either we create a new class implementing a coworker in the
127 // same manner as the worker (poll loop). Anyway there is no way
128 // around removing the Bcast and adding another tag in the poll
129 // loop above in order to be able to exit cleanly.
130 if(stop_req != MPI_REQUEST_NULL) {
131 MPI_Status status;
132 int flag = 0;
133 MPI_Test(&stop_req, &flag, &status);
134
135 if(flag) {
136
137 if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
138 Param_t params;
140
141 try {
142 SimPtr_t sim(new Sim_t(objectives_, constraints_,
145
146 sim->run();
147 } catch(OptPilotException &ex) {
148 std::cout << "Exception while running simulation: "
149 << ex.what() << std::endl;
150 }
151 MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
152 MPI_ANY_TAG, coworker_comm_, &stop_req);
153 }
154
155 if(status.MPI_TAG == MPI_STOP_TAG) {
156 is_running_ = false;
157 break;
158 }
159 }
160 }
161 }
162 }
163
164
165protected:
166
170 std::string simulation_name_;
172
173 const std::map<std::string, std::string> userVariables_;
174
176 void notifyCoWorkers(int tag) {
177
178 for(int i=0; i < num_coworkers_; i++) {
179 if(i == leader_pid_) continue;
180
181 size_t dummy = 0;
182 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, i, tag, coworker_comm_);
183 }
184 }
185
186 void setupPoll() {
187 size_t dummy = 1;
188 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
190 }
191
192 void prePoll()
193 {}
194
195 void postPoll()
196 {}
197
198 void onStop() {
199 if(num_coworkers_ > 1)
201 }
202
203 virtual bool onMessage(MPI_Status status, size_t recv_value) {
204
205 if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
206
207 is_idle_ = false;
208 size_t job_id = recv_value;
209
210 // get new job
211 Param_t params;
212 MPI_Recv_params(params, (size_t)pilot_rank_, comm_m);
213
214 // and forward to coworkers (if any)
215 if(num_coworkers_ > 1) {
218 }
219
220 //XXX we need to know if we want EVAL or DERIVATIVE
221 //reqVarContainer_t reqVars;
222 //MPI_Recv_reqvars(reqVars, (size_t)pilot_rank_, comm_m);
223
224 reqVarContainer_t requested_results;
225 try {
226 SimPtr_t sim(new Sim_t(objectives_, constraints_,
229
230 // run simulation in a "blocking" fashion
231 sim->run();
232 sim->collectResults();
233 sim->cleanUp();
234 requested_results = sim->getResults();
235 } catch(OptPilotException &ex) {
236 std::cout << "Exception while running simulation: "
237 << ex.what() << std::endl;
238 }
239
240 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, pilot_rank_,
242
243 size_t dummy = 0;
244 MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
246
247 MPI_Send_reqvars(requested_results, (size_t)pilot_rank_, comm_m);
248
249 is_idle_ = true;
250 return true;
251
252 } else {
253 std::stringstream os;
254 os << "Unexpected MPI_TAG: " << status.MPI_TAG;
255 std::cout << "(Worker) Error: " << os.str() << std::endl;
256 throw OptPilotException("Worker::onMessage", os.str());
257 }
258 }
259};
260
261#endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_WORKER_FINISHED_TAG
notify pilot that work has been finished and results are ready to collect
Definition: MPIHelper.h:35
#define MPI_COWORKER_NEW_JOB_TAG
notifies coworkers about new jobs
Definition: MPIHelper.h:40
#define MPI_WORKER_STATUSUPDATE_TAG
notify pilot about worker status
Definition: MPIHelper.h:33
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Bcast_params(Param_t &params, size_t root, MPI_Comm comm)
broadcast params to all entities in comm
Definition: MPIHelper.cpp:60
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
bundles all communicators for a specific role/pid
Definition: types.h:32
MPI_Comm coworkers
Definition: types.h:39
int master_local_pid
Definition: types.h:36
Definition: Poller.h:35
bool is_running_
Definition: Poller.h:54
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
virtual void run()
Definition: Poller.h:79
Definition: Worker.h:45
Expressions::Named_t objectives_
Definition: Worker.h:108
~Worker()
Definition: Worker.h:98
Worker(Expressions::Named_t objectives, Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args, const std::map< std::string, std::string > &userVariables, bool isOptimizer=true)
Definition: Worker.h:66
std::string simulation_name_
Definition: Worker.h:170
void runCoWorker()
Definition: Worker.h:114
int pilot_rank_
Definition: Worker.h:169
int num_coworkers_
Definition: Worker.h:168
void postPoll()
executed after handling (if any) new request
Definition: Worker.h:195
void onStop()
enable implementation to react to STOP tag
Definition: Worker.h:198
MPI_Comm coworker_comm_
Definition: Worker.h:106
void notifyCoWorkers(int tag)
notify coworkers of incoming broadcast
Definition: Worker.h:176
void prePoll()
executed before checking for new request
Definition: Worker.h:192
CmdArguments_t cmd_args_
Definition: Worker.h:171
int leader_pid_
Definition: Worker.h:167
const std::map< std::string, std::string > userVariables_
Definition: Worker.h:173
void setupPoll()
executed before starting polling loop
Definition: Worker.h:186
Worker(Expressions::Named_t constraints, std::string simName, Comm::Bundle_t comms, CmdArguments_t args)
Definition: Worker.h:49
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: Worker.h:203
boost::scoped_ptr< Sim_t > SimPtr_t
Definition: Worker.h:103
Expressions::Named_t constraints_
Definition: Worker.h:109
bool is_idle_
Definition: Worker.h:105
virtual const char * what() const