OPAL (Object Oriented Parallel Accelerator Library)  2021.1.99
OPAL
Poller.h
Go to the documentation of this file.
1 //
2 // Class Poller
3 // An interface implementing the basics of a poll loop, posting an
4 // MPI_Irecv and waiting for new requests on a specific communicator.
5 //
6 // @see Pilot
7 // @see Worker
8 // @see Optimizer
9 // @see MPIHelper.h
10 //
11 // Copyright (c) 2010 - 2013, Yves Ineichen, ETH Zürich
12 // All rights reserved
13 //
14 // Implemented as part of the PhD thesis
15 // "Toward massively parallel multi-objective optimization with application to
16 // particle accelerators" (https://doi.org/10.3929/ethz-a-009792359)
17 //
18 // This file is part of OPAL.
19 //
20 // OPAL is free software: you can redistribute it and/or modify
21 // it under the terms of the GNU General Public License as published by
22 // the Free Software Foundation, either version 3 of the License, or
23 // (at your option) any later version.
24 //
25 // You should have received a copy of the GNU General Public License
26 // along with OPAL. If not, see <https://www.gnu.org/licenses/>.
27 //
28 #ifndef __POLLER_H__
29 #define __POLLER_H__
30 
31 #include <iostream>
32 
33 #include "Util/MPIHelper.h"
34 
35 class Poller {
36 
37 public:
38 
39  Poller(MPI_Comm comm, double delay = 0.1)
40  : comm_m(comm)
41  , is_running_(false)
42  , poll_delay_(delay)
43  {
44  last_polled_ = MPI_Wtime();
45  }
46 
47  virtual ~Poller()
48  {}
49 
50 protected:
52  MPI_Comm comm_m;
53 
55 
57  double last_polled_;
59  double poll_delay_;
60 
65  virtual bool onMessage(MPI_Status status, size_t recv_value) = 0;
67  virtual void onStop() = 0;
68 
70  virtual void setupPoll() = 0;
72  virtual void prePoll() = 0;
74  virtual void postPoll() = 0;
75 
79  virtual void run() {
80  MPI_Status status;
81  MPI_Request req;
82  size_t recv_value = 0;
83  int flag = 0;
84 
85  setupPoll();
86 
87  MPI_Irecv(&recv_value, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG,
88  comm_m, &req);
89 
90  is_running_ = true;
91 
92  while(true) {
93 
94  // regulate the amount of MPI_Test calls (expensive)
95  double tnow = MPI_Wtime();
96  if(tnow - last_polled_ > poll_delay_)
97  last_polled_ = tnow;
98  else
99  continue;
100 
101  prePoll();
102 
103  if(req != MPI_REQUEST_NULL) {
104  MPI_Test(&req, &flag, &status);
105  if(flag) {
106  if(status.MPI_TAG == MPI_STOP_TAG) {
107  is_running_ = false;
108  onStop();
109  return;
110  } else {
111  if(onMessage(status, recv_value))
112  MPI_Irecv(&recv_value, 1, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE,
113  MPI_ANY_TAG, comm_m, &req);
114  else
115  break;
116  }
117  }
118  }
119 
120  postPoll();
121  }
122  }
123 };
124 
125 #endif
#define MPI_STOP_TAG
global stop tag to exit poll loop (
Definition: MPIHelper.h:64
Definition: Poller.h:35
Poller(MPI_Comm comm, double delay=0.1)
Definition: Poller.h:39
bool is_running_
Definition: Poller.h:54
double poll_delay_
delay in seconds between polls
Definition: Poller.h:59
double last_polled_
time of last MPI_Test
Definition: Poller.h:57
virtual void prePoll()=0
executed before checking for new request
virtual bool onMessage(MPI_Status status, size_t recv_value)=0
virtual void setupPoll()=0
executed before starting polling loop
virtual void postPoll()=0
executed after handling (if any) new request
virtual void onStop()=0
enable implementation to react to STOP tag
virtual ~Poller()
Definition: Poller.h:47
MPI_Comm comm_m
communicator the poller listens to requests
Definition: Poller.h:52
virtual void run()
Definition: Poller.h:79