pion-net  4.0.9
PionScheduler.hpp
1 // -----------------------------------------------------------------------
2 // pion-common: a collection of common libraries used by the Pion Platform
3 // -----------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_PIONSCHEDULER_HEADER__
11 #define __PION_PIONSCHEDULER_HEADER__
12 
13 #include <vector>
14 #include <boost/asio.hpp>
15 #include <boost/bind.hpp>
16 #include <boost/function/function0.hpp>
17 #include <boost/cstdint.hpp>
18 #include <boost/shared_ptr.hpp>
19 #include <boost/noncopyable.hpp>
20 #include <boost/thread/thread.hpp>
21 #include <boost/thread/mutex.hpp>
22 #include <boost/thread/xtime.hpp>
23 #include <boost/thread/condition.hpp>
24 #include <pion/PionConfig.hpp>
25 #include <pion/PionException.hpp>
26 #include <pion/PionLogger.hpp>
27 
28 
29 namespace pion { // begin namespace pion
30 
34 class PION_COMMON_API PionScheduler :
35  private boost::noncopyable
36 {
37 public:
38 
41  : m_logger(PION_GET_LOGGER("pion.PionScheduler")),
42  m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false)
43  {}
44 
46  virtual ~PionScheduler() {}
47 
49  virtual void startup(void) {}
50 
52  virtual void shutdown(void);
53 
55  void join(void);
56 
60  void addActiveUser(void);
61 
63  void removeActiveUser(void);
64 
66  inline bool isRunning(void) const { return m_is_running; }
67 
69  inline void setNumThreads(const boost::uint32_t n) { m_num_threads = n; }
70 
72  inline boost::uint32_t getNumThreads(void) const { return m_num_threads; }
73 
75  inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; }
76 
78  inline PionLogger getLogger(void) { return m_logger; }
79 
81  virtual boost::asio::io_service& getIOService(void) = 0;
82 
88  virtual void post(boost::function0<void> work_func) {
89  getIOService().post(work_func);
90  }
91 
98  void keepRunning(boost::asio::io_service& my_service,
99  boost::asio::deadline_timer& my_timer);
100 
107  inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) {
108  boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec));
109  boost::thread::sleep(wakeup_time);
110  }
111 
121  template <typename ConditionType, typename LockType>
122  inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock,
123  boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
124  {
125  boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec));
126  wakeup_condition.timed_wait(wakeup_lock, wakeup_time);
127  }
128 
129 
131  void processServiceWork(boost::asio::io_service& service);
132 
133 
134 protected:
135 
144  static boost::xtime getWakeupTime(boost::uint32_t sleep_sec,
145  boost::uint32_t sleep_nsec);
146 
148  virtual void stopServices(void) {}
149 
151  virtual void stopThreads(void) {}
152 
154  virtual void finishServices(void) {}
155 
157  virtual void finishThreads(void) {}
158 
159 
161  static const boost::uint32_t DEFAULT_NUM_THREADS;
162 
164  static const boost::uint32_t NSEC_IN_SECOND;
165 
167  static const boost::uint32_t MICROSEC_IN_SECOND;
168 
170  static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS;
171 
172 
174  boost::mutex m_mutex;
175 
178 
180  boost::condition m_no_more_active_users;
181 
183  boost::condition m_scheduler_has_stopped;
184 
186  boost::uint32_t m_num_threads;
187 
189  boost::uint32_t m_active_users;
190 
193 };
194 
195 
199 class PION_COMMON_API PionMultiThreadScheduler :
200  public PionScheduler
201 {
202 public:
203 
206 
209 
210 
211 protected:
212 
214  virtual void stopThreads(void) {
215  if (! m_thread_pool.empty()) {
216  PION_LOG_DEBUG(m_logger, "Waiting for threads to shutdown");
217 
218  // wait until all threads in the pool have stopped
219  boost::thread current_thread;
220  for (ThreadPool::iterator i = m_thread_pool.begin();
221  i != m_thread_pool.end(); ++i)
222  {
223  // make sure we do not call join() for the current thread,
224  // since this may yield "undefined behavior"
225  if (**i != current_thread) (*i)->join();
226  }
227  }
228  }
229 
231  virtual void finishThreads(void) { m_thread_pool.clear(); }
232 
233 
235  typedef std::vector<boost::shared_ptr<boost::thread> > ThreadPool;
236 
237 
240 };
241 
242 
246 class PION_COMMON_API PionSingleServiceScheduler :
248 {
249 public:
250 
253  : m_service(), m_timer(m_service)
254  {}
255 
257  virtual ~PionSingleServiceScheduler() { shutdown(); }
258 
260  virtual boost::asio::io_service& getIOService(void) { return m_service; }
261 
263  virtual void startup(void);
264 
265 
266 protected:
267 
269  virtual void stopServices(void) { m_service.stop(); }
270 
272  virtual void finishServices(void) { m_service.reset(); }
273 
274 
276  boost::asio::io_service m_service;
277 
279  boost::asio::deadline_timer m_timer;
280 };
281 
282 
286 class PION_COMMON_API PionOneToOneScheduler :
288 {
289 public:
290 
293  : m_service_pool(), m_next_service(0)
294  {}
295 
297  virtual ~PionOneToOneScheduler() { shutdown(); }
298 
300  virtual boost::asio::io_service& getIOService(void) {
301  boost::mutex::scoped_lock scheduler_lock(m_mutex);
302  while (m_service_pool.size() < m_num_threads) {
303  boost::shared_ptr<ServicePair> service_ptr(new ServicePair());
304  m_service_pool.push_back(service_ptr);
305  }
306  if (++m_next_service >= m_num_threads)
307  m_next_service = 0;
308  PION_ASSERT(m_next_service < m_num_threads);
309  return m_service_pool[m_next_service]->first;
310  }
311 
318  virtual boost::asio::io_service& getIOService(boost::uint32_t n) {
319  PION_ASSERT(n < m_num_threads);
320  PION_ASSERT(n < m_service_pool.size());
321  return m_service_pool[n]->first;
322  }
323 
325  virtual void startup(void);
326 
327 
328 protected:
329 
331  virtual void stopServices(void) {
332  for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
333  (*i)->first.stop();
334  }
335  }
336 
338  virtual void finishServices(void) { m_service_pool.clear(); }
339 
340 
342  struct ServicePair {
343  ServicePair(void) : first(), second(first) {}
344  boost::asio::io_service first;
345  boost::asio::deadline_timer second;
346  };
347 
349  typedef std::vector<boost::shared_ptr<ServicePair> > ServicePool;
350 
351 
354 
356  boost::uint32_t m_next_service;
357 };
358 
359 
360 } // end namespace pion
361 
362 #endif
virtual void finishThreads(void)
finishes all threads used to perform work
static const boost::uint32_t NSEC_IN_SECOND
number of nanoseconds in one full second (10 ^ 9)
void setLogger(PionLogger log_ptr)
sets the logger to be used
boost::asio::io_service m_service
service used to manage async I/O events
PionScheduler(void)
constructs a new PionScheduler
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
boost::condition m_no_more_active_users
condition triggered when there are no more active users
std::vector< boost::shared_ptr< boost::thread > > ThreadPool
typedef for a pool of worker threads
PionOneToOneScheduler(void)
constructs a new PionOneToOneScheduler
PionLogger m_logger
primary logging interface used by this class
std::vector< boost::shared_ptr< ServicePair > > ServicePool
typedef for a pool of IO services
virtual ~PionScheduler()
virtual destructor
bool m_is_running
true if the thread scheduler is running
virtual ~PionMultiThreadScheduler()
virtual destructor
static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
boost::uint32_t m_next_service
the next service to use for scheduling work
PionLogger getLogger(void)
returns the logger currently in use
ServicePool m_service_pool
pool of IO services used to schedule work
virtual void post(boost::function0< void > work_func)
bool isRunning(void) const
returns true if the scheduler is running
virtual ~PionOneToOneScheduler()
virtual destructor
virtual void finishServices(void)
finishes all services used to schedule work
virtual void finishServices(void)
finishes all services used to schedule work
void setNumThreads(const boost::uint32_t n)
sets the number of threads to be used (these are shared by all servers)
virtual void stopThreads(void)
stops all threads used to perform work
virtual boost::asio::io_service & getIOService(void)
returns an async I/O service used to schedule work
static const boost::uint32_t MICROSEC_IN_SECOND
number of microseconds in one full second (10 ^ 6)
typedef for a pair object where first is an IO service and second is a deadline timer ...
virtual void stopServices(void)
stops all services used to schedule work
virtual ~PionSingleServiceScheduler()
virtual destructor
static const boost::uint32_t DEFAULT_NUM_THREADS
default number of worker threads in the thread pool
boost::condition m_scheduler_has_stopped
condition triggered when the scheduler has stopped
boost::uint32_t m_num_threads
total number of worker threads in the pool
boost::mutex m_mutex
mutex to make class thread-safe
virtual void stopServices(void)
stops all services used to schedule work
boost::asio::deadline_timer m_timer
timer used to periodically check for shutdown
virtual boost::asio::io_service & getIOService(boost::uint32_t n)
virtual void stopServices(void)
stops all services used to schedule work
static void sleep(ConditionType &wakeup_condition, LockType &wakeup_lock, boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
virtual void finishServices(void)
finishes all services used to schedule work
PionSingleServiceScheduler(void)
constructs a new PionSingleServiceScheduler
virtual void finishThreads(void)
finishes all threads used to perform work
PionMultiThreadScheduler(void)
constructs a new PionSingleServiceScheduler
boost::uint32_t getNumThreads(void) const
returns the number of threads currently in use
ThreadPool m_thread_pool
pool of threads used to perform work
boost::uint32_t m_active_users
the scheduler will not shutdown until there are no more active users
virtual boost::asio::io_service & getIOService(void)
returns an async I/O service used to schedule work
static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS
number of seconds a timer should wait for to keep the IO services running
virtual void stopThreads(void)
stops all threads used to perform work