OPAL (Object Oriented Parallel Accelerator Library) 2022.1
OPAL
Pilot.h
Go to the documentation of this file.
1//
2// Class Pilot
3// The Optimization Pilot (Master): Coordinates requests by optimizer
4// to workers and reports results back on a given communicator.
5//
6// Every worker thread notifies the master here if idle or not. When
7// available the master dispatches one of the pending simulations to the
8// worker who will run the specified simulation and report results back to
9// the master. The Optimizer class will poll the scheduler to check if some
10// (or all) results are available and continue to optimize and request new
11// simulation results.
12//
13// @see Worker
14// @see Optimizer
15//
16// @tparam Opt_t type of the optimizer
17// @tparam Sim_t type of the simulation
18// @tparam SolPropagationGraph_t strategy to distribute solution between
19// master islands
20// @tparam Comm_t comm splitter strategy
21//
22// Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
23// All rights reserved
24//
25// Implemented as part of the PhD thesis
26// "Toward massively parallel multi-objective optimization with application to
27// particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
28//
29// This file is part of OPAL.
30//
31// OPAL is free software: you can redistribute it and/or modify
32// it under the terms of the GNU General Public License as published by
33// the Free Software Foundation, either version 3 of the License, or
34// (at your option) any later version.
35//
36// You should have received a copy of the GNU General Public License
37// along with OPAL. If not, see <https://www.gnu.org/licenses/>.
38//
39#ifndef __PILOT_H__
40#define __PILOT_H__
41
42#include <mpi.h>
43#include <iostream>
44#include <string>
45#include <unistd.h>
46
47#include "boost/smart_ptr.hpp"
48//#include "boost/dynamic_bitset.hpp"
49
50#include "Comm/MasterNode.h"
51#include "Comm/CommSplitter.h"
52
53#include "Util/Types.h"
54#include "Util/CmdArguments.h"
56
57#include "Pilot/Poller.h"
58#include "Pilot/Worker.h"
59#include "Optimizer/Optimizer.h"
60
61#include "Util/Trace/Trace.h"
62#include "Util/Trace/FileSink.h"
64
66
67
96template <
97 class Opt_t
98 , class Sim_t
99 , class SolPropagationGraph_t
100 , class Comm_t
101>
102class Pilot : protected Poller {
103
104public:
105
106 // constructor only for Pilot classes inherited from this class
107 // they have their own setup function
108 Pilot(CmdArguments_t args, boost::shared_ptr<Comm_t> comm,
109 const DVarContainer_t &dvar)
110 : Poller(comm->mpiComm())
111 , comm_(comm)
112 , cmd_args_(args)
113 , dvars_(dvar)
114 {
115 // do nothing
116 }
117
118 Pilot(CmdArguments_t args, boost::shared_ptr<Comm_t> comm,
119 functionDictionary_t known_expr_funcs)
120 : Poller(comm->mpiComm())
121 , comm_(comm)
122 , cmd_args_(args)
123 {
124 setup(known_expr_funcs);
125 }
126
127 Pilot(CmdArguments_t args, boost::shared_ptr<Comm_t> comm,
128 functionDictionary_t known_expr_funcs,
129 const DVarContainer_t &dvar,
130 const Expressions::Named_t &obj,
131 const Expressions::Named_t &cons,
132 std::vector<double> hypervolRef = {},
133 bool isOptimizerRun = true,
134 const std::map<std::string, std::string> &userVariables = {})
135 : Poller(comm->mpiComm())
136 , comm_(comm)
137 , cmd_args_(args)
138 , objectives_(obj)
139 , constraints_(cons)
140 , dvars_(dvar)
141 , hypervolRef_(hypervolRef)
142 {
143 if (isOptimizerRun)
144 setup(known_expr_funcs, userVariables);
145 }
146
147 virtual ~Pilot()
148 {
149 for (auto itr = objectives_.begin(); itr != objectives_.end(); ++ itr)
150 delete itr->second;
151
152 for (auto itr = constraints_.begin(); itr != constraints_.end(); ++ itr)
153 delete itr->second;
154 }
155
156
157protected:
158
160 MPI_Comm worker_comm_;
162 MPI_Comm opt_comm_;
165
166 boost::shared_ptr<Comm_t> comm_;
168
172
174
175 typedef MasterNode< typename Opt_t::SolutionState_t,
176 SolPropagationGraph_t > MasterNode_t;
177 boost::scoped_ptr< MasterNode_t > master_node_;
178
180 std::string input_file_;
181
185
189 std::vector<double> hypervolRef_;
190
191
192 // keep track of state of all workers
193 std::vector<bool> is_worker_idle_;
194 //boost::dynamic_bitset<> is_worker_idle_;
195
197 typedef std::map<size_t, std::pair<Param_t, reqVarContainer_t> > Jobs_t;
201
202 //DEBUG
203 boost::scoped_ptr<Trace> job_trace_;
204
205private:
206 void setup(functionDictionary_t known_expr_funcs,
207 const std::map<std::string, std::string> &userVariables) {
208 global_rank_ = comm_->globalRank();
209
210 if(global_rank_ == 0) {
211 std::cout << "\033[01;35m";
212 std::cout << " _ _ _ _ " << std::endl;
213 std::cout << " | | (_) | | | " << std::endl;
214 std::cout << " ___ _ __ | |_ ______ _ __ _| | ___ | |_ " << std::endl;
215 std::cout << " / _ \\| '_ \\| __|______| '_ \\| | |/ _ \\| __|" << std::endl;
216 std::cout << "| (_) | |_) | |_ | |_) | | | (_) | |_ " << std::endl;
217 std::cout << " \\___/| .__/ \\__| | .__/|_|_|\\___/ \\__|" << std::endl;
218 std::cout << " | | | | " << std::endl;
219 std::cout << " |_| |_| " << std::endl;
220 // ADA std::cout << "☷ Version: \t" << PACKAGE_VERSION << std::endl;
221 //std::cout << "☷ Git: \t\t" << GIT_VERSION << std::endl;
222 //std::cout << "☷ Build Date: \t" << BUILD_DATE << std::endl;
223 std::cout << "\e[0m";
224 std::cout << std::endl;
225 }
226
227 MPI_Barrier(MPI_COMM_WORLD);
228 parseInputFile(known_expr_funcs, true);
229
230 // here the control flow starts to diverge
231 if ( comm_->isOptimizer() ) { startOptimizer(); }
232 else if ( comm_->isWorker() ) { startWorker(userVariables); }
233 else if ( comm_->isPilot() ) { startPilot(); }
234 }
235
236protected:
237
238 void parseInputFile(functionDictionary_t /*known_expr_funcs*/, bool isOptimizationRun) {
239
240 try {
241 input_file_ = cmd_args_->getArg<std::string>("inputfile", true);
242 } catch (OptPilotException &e) {
243 std::cout << "Could not find 'inputfile' in arguments.. Aborting."
244 << std::endl;
245 MPI_Abort(comm_m, -101);
246 }
247
248 if((isOptimizationRun && objectives_.size() == 0) || dvars_.size() == 0) {
249 throw OptPilotException("Pilot::Pilot()",
250 "No objectives or dvars specified");
251 }
252
253 if(global_rank_ == 0) {
254 std::ostringstream os;
255 os << "\033[01;35m";
256 os << " ✔ " << objectives_.size()
257 << " objectives" << std::endl;
258 if (isOptimizationRun) {
259 os << " ✔ " << constraints_.size()
260 << " constraints" << std::endl;
261 }
262 os << " ✔ " << dvars_.size()
263 << " dvars" << std::endl;
264 os << "\e[0m";
265 os << std::endl;
266 std::cout << os.str() << std::flush;
267 }
268
269 MPI_Barrier(MPI_COMM_WORLD);
270 }
271
272 virtual
274
275 std::ostringstream os;
276 os << "\033[01;35m" << " " << global_rank_ << " (PID: " << getpid() << ") ▶ Opt"
277 << "\e[0m" << std::endl;
278 std::cout << os.str() << std::flush;
279
280 boost::scoped_ptr<Opt_t> opt(
281 new Opt_t(objectives_, constraints_, dvars_, objectives_.size(),
282 comm_->getBundle(), cmd_args_, hypervolRef_, comm_->getNrWorkerGroups()));
283 opt->initialize();
284
285 std::cout << "Stop Opt.." << std::endl;
286 }
287
288 virtual
289 void startWorker(const std::map<std::string, std::string> &userVariables) {
290
291 std::ostringstream os;
292 os << "\033[01;35m" << " " << global_rank_ << " (PID: " << getpid() << ") ▶ Worker"
293 << "\e[0m" << std::endl;
294 std::cout << os.str() << std::flush;
295
296 size_t pos = input_file_.find_last_of("/");
297 std::string tmplfile = input_file_;
298 if(pos != std::string::npos)
299 tmplfile = input_file_.substr(pos+1);
300 pos = tmplfile.find(".");
301 std::string simName = tmplfile.substr(0,pos);
302
303 boost::scoped_ptr< Worker<Sim_t> > w(
305 comm_->getBundle(), cmd_args_, userVariables));
306
307 std::cout << "Stop Worker.." << std::endl;
308 }
309
310 virtual
311 void startPilot() {
312
313 std::ostringstream os;
314 os << "\033[01;35m" << " " << global_rank_ << " (PID: " << getpid() << ") ▶ Pilot"
315 << "\e[0m" << std::endl;
316 std::cout << os.str() << std::flush;
317
318 // Traces
319 std::ostringstream trace_filename;
320 trace_filename << "pilot.trace." << comm_->getBundle().island_id;
321 job_trace_.reset(new Trace("Optimizer Job Trace"));
322 job_trace_->registerComponent( "sink",
323 boost::shared_ptr<TraceComponent>(new FileSink(trace_filename.str())));
324
325 worker_comm_ = comm_->getBundle().worker;
326 opt_comm_ = comm_->getBundle().opt;
327 coworker_comm_ = comm_->getBundle().world;
328
330 MPI_Comm_rank(worker_comm_, &my_rank_in_worker_comm_);
332 MPI_Comm_rank(opt_comm_, &my_rank_in_opt_comm_);
333
335 MPI_Comm_size(worker_comm_, &total_available_workers_);
338
339 // setup master network
340 num_coworkers_ = 0;
341 MPI_Comm_size(coworker_comm_, &num_coworkers_);
342 if(num_coworkers_ > 1) {
343 //FIXME: proper upper bound for window size
344 int alpha = cmd_args_->getArg<int>("initialPopulation", false);
345 int opt_size = objectives_.size() + constraints_.size();
346 int overhead = 10;
347 size_t upperbound_buffer_size =
348 sizeof(double) * alpha * (1 + opt_size) * 1000
349 + overhead;
350 master_node_.reset(
351 new MasterNode< typename Opt_t::SolutionState_t,
352 SolPropagationGraph_t >(
353 coworker_comm_, upperbound_buffer_size, objectives_.size(),
354 comm_->getBundle().island_id));
355 }
356
357 has_opt_converged_ = false;
358 continue_polling_ = true;
359 run();
360
361 std::cout << "Stop Pilot.." << std::endl;
362 }
363
364 virtual
366 {}
367
368 virtual
369 void prePoll()
370 {}
371
372 virtual
373 void onStop()
374 {}
375
376 virtual
377 void postPoll() {
378 // terminating all workers is tricky since we do not know their state.
379 // All workers are notified (to terminate) when opt has converged and
380 // all workers are idle.
381 bool all_worker_idle = true;
382
383 // in the case where new requests became available after worker
384 // delivered last results (and switched to idle state).
385 for(int i = 0; i < total_available_workers_; i++) {
386
387 if(i == my_rank_in_worker_comm_) continue;
388
389 all_worker_idle = all_worker_idle && is_worker_idle_[i];
390
391 if(is_worker_idle_[i] && !request_queue_.empty())
393 }
394
395 // when all workers have been notified we can stop polling
396 if(all_worker_idle && has_opt_converged_) {
397 continue_polling_ = false;
398 int dummy = 0;
399 for(int worker = 0; worker < total_available_workers_; worker++) {
400 MPI_Request req;
401 MPI_Isend(&dummy, 1, MPI_INT, worker,
403 }
404 }
405 }
406
407
408 virtual
409 void sendNewJobToWorker(int worker) {
410
411 // no new jobs once our opt has converged
412 if(has_opt_converged_) return;
413
414 JobIter_t job = request_queue_.begin();
415 size_t jid = job->first;
416
417 Param_t job_params = job->second.first;
418 MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, worker, MPI_WORK_JOBID_TAG, worker_comm_);
419 MPI_Send_params(job_params, worker, worker_comm_);
420
421 //reqVarContainer_t job_reqvars = job->second.second;
422 //MPI_Send_reqvars(job_reqvars, worker, worker_comm_);
423
424 running_job_list_.insert(std::pair<size_t,
425 std::pair<Param_t, reqVarContainer_t> >(job->first, job->second));
426 request_queue_.erase(jid);
427 is_worker_idle_[worker] = false;
428
429 std::ostringstream dump;
430 dump << "sent job with ID " << jid << " to worker " << worker
431 << std::endl;
432 job_trace_->log(dump);
433
434 }
435
436
437 virtual
438 bool onMessage(MPI_Status status, size_t recv_value){
439
440 MPITag_t tag = MPITag_t(status.MPI_TAG);
441 switch(tag) {
442
443 case WORKER_FINISHED_TAG: {
444
445 size_t job_id = recv_value;
446
447 size_t dummy = 1;
448 MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, status.MPI_SOURCE,
450
452 MPI_Recv_reqvars(res, status.MPI_SOURCE, worker_comm_);
453
454 running_job_list_.erase(job_id);
455 is_worker_idle_[status.MPI_SOURCE] = true;
456
457 std::ostringstream dump;
458 dump << "worker finished job with ID " << job_id << std::endl;
459 job_trace_->log(dump);
460
461
462 // optimizer already terminated, cannot accept new messages
463 if(has_opt_converged_) return true;
464
465 int opt_master_rank = comm_->getLeader();
466 MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, opt_master_rank,
468
469 MPI_Send_reqvars(res, opt_master_rank, opt_comm_);
470
471 // we keep worker busy _after_ results have been sent to optimizer
472 if(!request_queue_.empty())
473 sendNewJobToWorker(status.MPI_SOURCE);
474
475 return true;
476 }
477
478 case OPT_NEW_JOB_TAG: {
479
480 size_t job_id = recv_value;
481 int opt_master_rank = comm_->getLeader();
482
483 Param_t job_params;
484 MPI_Recv_params(job_params, (size_t)opt_master_rank, opt_comm_);
485
486 reqVarContainer_t reqVars;
487 //MPI_Recv_reqvars(reqVars, (size_t)opt_master_rank, job_size, opt_comm_);
488
489 std::pair<Param_t, reqVarContainer_t> job =
490 std::pair<Param_t, reqVarContainer_t>(job_params, reqVars);
491 request_queue_.insert(
492 std::pair<size_t, std::pair<Param_t, reqVarContainer_t> >(
493 job_id, job));
494
495 std::ostringstream dump;
496 dump << "new opt job with ID " << job_id << std::endl;
497 job_trace_->log(dump);
498
499 return true;
500 }
501
503
504 if(num_coworkers_ <= 1) return true;
505
506 std::ostringstream dump;
507 dump << "starting solution exchange.. " << status.MPI_SOURCE << std::endl;
508 job_trace_->log(dump);
509
510 // we start by storing or local solution state
511 size_t buffer_size = recv_value;
512 int opt_master_rank = status.MPI_SOURCE; //comm_->getLeader();
513
514 char *buffer = new char[buffer_size];
515 MPI_Recv(buffer, buffer_size, MPI_CHAR, opt_master_rank,
517 master_node_->store(buffer, buffer_size);
518 delete[] buffer;
519
520 dump.clear();
521 dump.str(std::string());
522 dump << "getting " << buffer_size << " bytes from OPT "
523 << opt_master_rank << std::endl;
524 job_trace_->log(dump);
525
526 // and then continue collecting all other solution states
527 std::ostringstream states;
528 master_node_->collect(states);
529 buffer_size = states.str().length();
530
531 dump.clear();
532 dump.str(std::string());
533 dump << "collected solution states of other PILOTS: "
534 << buffer_size << " bytes" << std::endl;
535 job_trace_->log(dump);
536
537 // send collected solution states to optimizer;
538 MPI_Send(&buffer_size, 1, MPI_UNSIGNED_LONG, opt_master_rank,
540
541 buffer = new char[buffer_size];
542 memcpy(buffer, states.str().c_str(), buffer_size);
543 MPI_Send(buffer, buffer_size, MPI_CHAR, opt_master_rank,
545
546 dump.clear();
547 dump.str(std::string());
548 dump << "sent set of new solutions to OPT" << std::endl;
549 job_trace_->log(dump);
550
551 delete[] buffer;
552
553 return true;
554 }
555
556 case OPT_CONVERGED_TAG: {
557 return stop();
558 }
559
561 is_worker_idle_[status.MPI_SOURCE] = true;
562 return true;
563 }
564
565 default: {
566 std::string msg = "(Pilot) Error: unexpected MPI_TAG: ";
567 msg += status.MPI_TAG;
568 throw OptPilotException("Pilot::onMessage", msg);
569 }
570 }
571 }
572
573 bool stop(bool isOpt = true) {
574
575 if(has_opt_converged_) return true;
576
577 has_opt_converged_ = true;
578 request_queue_.clear();
579 size_t dummy = 0;
580 MPI_Request req;
581 MPI_Isend(&dummy, 1, MPI_UNSIGNED_LONG, comm_->getLeader(), MPI_STOP_TAG, opt_comm_, &req);
582
583 if(! isOpt) return true;
584 if(num_coworkers_ <= 1) return true;
585
586 if(! cmd_args_->getArg<bool>("one-pilot-converge", false, false))
587 return true;
588
589 // propagate converged message to other pilots
590 // FIXME what happens if two island converge at the same time?
591 int my_rank = 0;
592 MPI_Comm_rank(coworker_comm_, &my_rank);
593 for(int i=0; i < num_coworkers_; i++) {
594 if(i == my_rank) continue;
595 MPI_Request req;
596 MPI_Isend(&dummy, 1, MPI_UNSIGNED_LONG, i, OPT_CONVERGED_TAG, coworker_comm_, &req);
597 }
598
599 return true;
600 }
601
602
603 // we overwrite run here to handle polling on two different communicators
604 //XXX: would be nice to give the poller interface an array of comms and
605 // listeners to be called..
606 void run() {
607
608 MPI_Request opt_request;
609 MPI_Request worker_request;
610 MPI_Status status;
611 int flag = 0;
612 size_t recv_value_worker = 0;
613 size_t recv_value_opt = 0;
614
615 setupPoll();
616
617 MPI_Irecv(&recv_value_opt, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
618 MPI_ANY_TAG, opt_comm_, &opt_request);
619 MPI_Irecv(&recv_value_worker, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
620 MPI_ANY_TAG, worker_comm_, &worker_request);
621
622 bool pending_opt_request = true;
623 bool pending_worker_request = true;
624 bool pending_pilot_request = false;
625
626 MPI_Request pilot_request;
627 size_t recv_value_pilot = 0;
628 if(cmd_args_->getArg<bool>("one-pilot-converge", false, false)) {
629 MPI_Irecv(&recv_value_pilot, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
630 MPI_ANY_TAG, coworker_comm_, &pilot_request);
631 pending_pilot_request = true;
632 }
633
634 while(continue_polling_) {
635
636 prePoll();
637
638 if(opt_request != MPI_REQUEST_NULL) {
639 MPI_Test(&opt_request, &flag, &status);
640 if(flag) {
641 pending_opt_request = false;
642 if(status.MPI_TAG == MPI_STOP_TAG) {
643 return;
644 } else {
645 if(onMessage(status, recv_value_opt)) {
646 MPI_Irecv(&recv_value_opt, 1, MPI_UNSIGNED_LONG,
647 MPI_ANY_SOURCE, MPI_ANY_TAG, opt_comm_,
648 &opt_request);
649 pending_opt_request = true;
650 } else
651 return;
652 }
653 }
654 }
655
656 if(worker_request != MPI_REQUEST_NULL) {
657 MPI_Test(&worker_request, &flag, &status);
658 if(flag) {
659 pending_worker_request = false;
660 if(status.MPI_TAG == MPI_STOP_TAG) {
661 return;
662 } else {
663 if(onMessage(status, recv_value_worker)) {
664 MPI_Irecv(&recv_value_worker, 1,
665 MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
666 worker_comm_, &worker_request);
667 pending_worker_request = true;
668 } else
669 return;
670 }
671 }
672 }
673
674 if(cmd_args_->getArg<bool>("one-pilot-converge", false, false)) {
675 if(pilot_request != MPI_REQUEST_NULL) {
676 MPI_Test(&pilot_request, &flag, &status);
677 if(flag) {
678 pending_pilot_request = false;
679 if(status.MPI_TAG == OPT_CONVERGED_TAG) {
680 stop(false);
681 } else {
682 MPI_Irecv(&recv_value_pilot, 1,
683 MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
684 coworker_comm_, &pilot_request);
685 pending_pilot_request = true;
686 }
687 }
688 }
689 }
690
691 postPoll();
692 }
693
694 if(pending_opt_request) MPI_Cancel( &opt_request );
695 if(pending_worker_request) MPI_Cancel( &worker_request );
696 if(pending_pilot_request) MPI_Cancel( &pilot_request );
697 }
698
699};
700
701#endif
Inform & endl(Inform &inf)
Definition: Inform.cpp:42
#define MPI_WORK_JOBID_TAG
unique id of the job
Definition: MPIHelper.h:52
#define MPI_EXCHANGE_SOL_STATE_DATA_TAG
Definition: MPIHelper.h:59
#define MPI_OPT_JOB_FINISHED_TAG
pilot tells optimizer that results are ready to collect
Definition: MPIHelper.h:46
#define MPI_EXCHANGE_SOL_STATE_RES_SIZE_TAG
Definition: MPIHelper.h:60
#define MPI_EXCHANGE_SOL_STATE_RES_TAG
Definition: MPIHelper.h:61
MPITag_t
Definition: MPIHelper.h:71
@ WORKER_FINISHED_TAG
Definition: MPIHelper.h:72
@ OPT_CONVERGED_TAG
Definition: MPIHelper.h:74
@ OPT_NEW_JOB_TAG
Definition: MPIHelper.h:73
@ WORKER_STATUSUPDATE_TAG
Definition: MPIHelper.h:75
@ EXCHANGE_SOL_STATE_TAG
Definition: MPIHelper.h:77
#define MPI_WORKER_FINISHED_ACK_TAG
pilot notifies worker that he is ready to collect the results
Definition: MPIHelper.h:37
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
void MPI_Send_reqvars(reqVarContainer_t reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:146
void MPI_Send_params(Param_t params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:87
void MPI_Recv_reqvars(reqVarContainer_t &reqvars, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:165
void MPI_Recv_params(Param_t &params, size_t pid, MPI_Comm comm)
Definition: MPIHelper.cpp:128
std::map< std::string, DVar_t > DVarContainer_t
Definition: Types.h:92
std::map< std::string, reqVarInfo_t > reqVarContainer_t
Definition: Types.h:79
namedVariableCollection_t Param_t
Definition: Types.h:48
boost::shared_ptr< CmdArguments > CmdArguments_t
Definition: CmdArguments.h:176
std::map< std::string, client::function::type > functionDictionary_t
Definition: Expression.h:56
std::map< std::string, Expressions::Expr_t * > Named_t
type of an expressions with a name
Definition: Expression.h:74
constexpr double alpha
The fine structure constant, no dimension.
Definition: Physics.h:72
constexpr double e
The value of.
Definition: Physics.h:39
std::string::iterator iterator
Definition: MSLang.h:16
Definition: Pilot.h:102
bool has_opt_converged_
Definition: Pilot.h:183
CmdArguments_t cmd_args_
Definition: Pilot.h:167
int total_available_workers_
Definition: Pilot.h:182
boost::shared_ptr< Comm_t > comm_
Definition: Pilot.h:166
MPI_Comm coworker_comm_
MPI communicator used for messages between all pilots.
Definition: Pilot.h:164
void setup(functionDictionary_t known_expr_funcs, const std::map< std::string, std::string > &userVariables)
Definition: Pilot.h:206
std::string input_file_
input file for simulation with embedded optimization problem
Definition: Pilot.h:180
std::map< size_t, std::pair< Param_t, reqVarContainer_t > > Jobs_t
keep track of requests and running jobs
Definition: Pilot.h:197
virtual void setupPoll()
executed before starting polling loop
Definition: Pilot.h:365
Jobs_t request_queue_
Definition: Pilot.h:200
std::vector< bool > is_worker_idle_
Definition: Pilot.h:193
virtual void startPilot()
Definition: Pilot.h:311
int global_rank_
Definition: Pilot.h:169
virtual void startWorker(const std::map< std::string, std::string > &userVariables)
Definition: Pilot.h:289
virtual ~Pilot()
Definition: Pilot.h:147
virtual void startOptimizer()
Definition: Pilot.h:273
bool stop(bool isOpt=true)
Definition: Pilot.h:573
virtual void prePoll()
executed before checking for new request
Definition: Pilot.h:369
Jobs_t::iterator JobIter_t
Definition: Pilot.h:198
int my_rank_in_worker_comm_
Definition: Pilot.h:170
Expressions::Named_t constraints_
constraints
Definition: Pilot.h:187
virtual void onStop()
enable implementation to react to STOP tag
Definition: Pilot.h:373
int my_rank_in_opt_comm_
Definition: Pilot.h:171
virtual void sendNewJobToWorker(int worker)
Definition: Pilot.h:409
MasterNode< typename Opt_t::SolutionState_t, SolPropagationGraph_t > MasterNode_t
Definition: Pilot.h:176
int num_coworkers_
Definition: Pilot.h:173
bool continue_polling_
Definition: Pilot.h:184
std::vector< double > hypervolRef_
hypervolume reference point
Definition: Pilot.h:189
Pilot(CmdArguments_t args, boost::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs, const DVarContainer_t &dvar, const Expressions::Named_t &obj, const Expressions::Named_t &cons, std::vector< double > hypervolRef={}, bool isOptimizerRun=true, const std::map< std::string, std::string > &userVariables={})
Definition: Pilot.h:127
MPI_Comm opt_comm_
MPI communicator used for messages to/from optimizer.
Definition: Pilot.h:162
boost::scoped_ptr< Trace > job_trace_
Definition: Pilot.h:203
Expressions::Named_t objectives_
objectives
Definition: Pilot.h:186
Pilot(CmdArguments_t args, boost::shared_ptr< Comm_t > comm, functionDictionary_t known_expr_funcs)
Definition: Pilot.h:118
Pilot(CmdArguments_t args, boost::shared_ptr< Comm_t > comm, const DVarContainer_t &dvar)
Definition: Pilot.h:108
MPI_Comm worker_comm_
MPI communicator used for messages to/from worker.
Definition: Pilot.h:160
virtual void postPoll()
executed after handling (if any) new request
Definition: Pilot.h:377
DVarContainer_t dvars_
design variables
Definition: Pilot.h:188
void run()
Definition: Pilot.h:606
boost::scoped_ptr< MasterNode_t > master_node_
Definition: Pilot.h:177
virtual bool onMessage(MPI_Status status, size_t recv_value)
Definition: Pilot.h:438
Jobs_t running_job_list_
Definition: Pilot.h:199
void parseInputFile(functionDictionary_t, bool isOptimizationRun)
Definition: Pilot.h:238
Definition: Poller.h:35
Poller(MPI_Comm comm, double delay=0.1)
Definition: Poller.h:39
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
Definition: Worker.h:45
Definition: Trace.h:33