pion-net
4.0.9
|
00001 // ----------------------------------------------------------------------- 00002 // pion-common: a collection of common libraries used by the Pion Platform 00003 // ----------------------------------------------------------------------- 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #ifndef __PION_PIONSCHEDULER_HEADER__ 00011 #define __PION_PIONSCHEDULER_HEADER__ 00012 00013 #include <vector> 00014 #include <boost/asio.hpp> 00015 #include <boost/bind.hpp> 00016 #include <boost/function/function0.hpp> 00017 #include <boost/cstdint.hpp> 00018 #include <boost/shared_ptr.hpp> 00019 #include <boost/noncopyable.hpp> 00020 #include <boost/thread/thread.hpp> 00021 #include <boost/thread/mutex.hpp> 00022 #include <boost/thread/xtime.hpp> 00023 #include <boost/thread/condition.hpp> 00024 #include <pion/PionConfig.hpp> 00025 #include <pion/PionException.hpp> 00026 #include <pion/PionLogger.hpp> 00027 00028 00029 namespace pion { // begin namespace pion 00030 00034 class PION_COMMON_API PionScheduler : 00035 private boost::noncopyable 00036 { 00037 public: 00038 00040 PionScheduler(void) 00041 : m_logger(PION_GET_LOGGER("pion.PionScheduler")), 00042 m_num_threads(DEFAULT_NUM_THREADS), m_active_users(0), m_is_running(false) 00043 {} 00044 00046 virtual ~PionScheduler() {} 00047 00049 virtual void startup(void) {} 00050 00052 virtual void shutdown(void); 00053 00055 void join(void); 00056 00060 void addActiveUser(void); 00061 00063 void removeActiveUser(void); 00064 00066 inline bool isRunning(void) const { return m_is_running; } 00067 00069 inline void setNumThreads(const boost::uint32_t n) { m_num_threads = n; } 00070 00072 inline boost::uint32_t getNumThreads(void) const { return m_num_threads; } 00073 00075 inline void setLogger(PionLogger log_ptr) { m_logger = log_ptr; } 00076 00078 inline PionLogger getLogger(void) { return m_logger; } 00079 00081 virtual boost::asio::io_service& getIOService(void) = 0; 00082 00088 virtual void post(boost::function0<void> work_func) { 00089 getIOService().post(work_func); 00090 } 00091 00098 void keepRunning(boost::asio::io_service& my_service, 00099 boost::asio::deadline_timer& my_timer); 00100 00107 inline static void sleep(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) { 00108 boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec)); 00109 boost::thread::sleep(wakeup_time); 00110 } 00111 00121 template <typename ConditionType, typename LockType> 00122 inline static void sleep(ConditionType& wakeup_condition, LockType& wakeup_lock, 00123 boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec) 00124 { 00125 boost::xtime wakeup_time(getWakeupTime(sleep_sec, sleep_nsec)); 00126 wakeup_condition.timed_wait(wakeup_lock, wakeup_time); 00127 } 00128 00129 00131 void processServiceWork(boost::asio::io_service& service); 00132 00133 00134 protected: 00135 00144 static boost::xtime getWakeupTime(boost::uint32_t sleep_sec, 00145 boost::uint32_t sleep_nsec); 00146 00148 virtual void stopServices(void) {} 00149 00151 virtual void stopThreads(void) {} 00152 00154 virtual void finishServices(void) {} 00155 00157 virtual void finishThreads(void) {} 00158 00159 00161 static const boost::uint32_t DEFAULT_NUM_THREADS; 00162 00164 static const boost::uint32_t NSEC_IN_SECOND; 00165 00167 static const boost::uint32_t MICROSEC_IN_SECOND; 00168 00170 static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS; 00171 00172 00174 boost::mutex m_mutex; 00175 00177 PionLogger m_logger; 00178 00180 boost::condition m_no_more_active_users; 00181 00183 boost::condition m_scheduler_has_stopped; 00184 00186 boost::uint32_t m_num_threads; 00187 00189 boost::uint32_t m_active_users; 00190 00192 bool m_is_running; 00193 }; 00194 00195 00199 class PION_COMMON_API PionMultiThreadScheduler : 00200 public PionScheduler 00201 { 00202 public: 00203 00205 PionMultiThreadScheduler(void) {} 00206 00208 virtual ~PionMultiThreadScheduler() {} 00209 00210 00211 protected: 00212 00214 virtual void stopThreads(void) { 00215 if (! m_thread_pool.empty()) { 00216 PION_LOG_DEBUG(m_logger, "Waiting for threads to shutdown"); 00217 00218 // wait until all threads in the pool have stopped 00219 boost::thread current_thread; 00220 for (ThreadPool::iterator i = m_thread_pool.begin(); 00221 i != m_thread_pool.end(); ++i) 00222 { 00223 // make sure we do not call join() for the current thread, 00224 // since this may yield "undefined behavior" 00225 if (**i != current_thread) (*i)->join(); 00226 } 00227 } 00228 } 00229 00231 virtual void finishThreads(void) { m_thread_pool.clear(); } 00232 00233 00235 typedef std::vector<boost::shared_ptr<boost::thread> > ThreadPool; 00236 00237 00239 ThreadPool m_thread_pool; 00240 }; 00241 00242 00246 class PION_COMMON_API PionSingleServiceScheduler : 00247 public PionMultiThreadScheduler 00248 { 00249 public: 00250 00252 PionSingleServiceScheduler(void) 00253 : m_service(), m_timer(m_service) 00254 {} 00255 00257 virtual ~PionSingleServiceScheduler() { shutdown(); } 00258 00260 virtual boost::asio::io_service& getIOService(void) { return m_service; } 00261 00263 virtual void startup(void); 00264 00265 00266 protected: 00267 00269 virtual void stopServices(void) { m_service.stop(); } 00270 00272 virtual void finishServices(void) { m_service.reset(); } 00273 00274 00276 boost::asio::io_service m_service; 00277 00279 boost::asio::deadline_timer m_timer; 00280 }; 00281 00282 00286 class PION_COMMON_API PionOneToOneScheduler : 00287 public PionMultiThreadScheduler 00288 { 00289 public: 00290 00292 PionOneToOneScheduler(void) 00293 : m_service_pool(), m_next_service(0) 00294 {} 00295 00297 virtual ~PionOneToOneScheduler() { shutdown(); } 00298 00300 virtual boost::asio::io_service& getIOService(void) { 00301 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00302 while (m_service_pool.size() < m_num_threads) { 00303 boost::shared_ptr<ServicePair> service_ptr(new ServicePair()); 00304 m_service_pool.push_back(service_ptr); 00305 } 00306 if (++m_next_service >= m_num_threads) 00307 m_next_service = 0; 00308 PION_ASSERT(m_next_service < m_num_threads); 00309 return m_service_pool[m_next_service]->first; 00310 } 00311 00318 virtual boost::asio::io_service& getIOService(boost::uint32_t n) { 00319 PION_ASSERT(n < m_num_threads); 00320 PION_ASSERT(n < m_service_pool.size()); 00321 return m_service_pool[n]->first; 00322 } 00323 00325 virtual void startup(void); 00326 00327 00328 protected: 00329 00331 virtual void stopServices(void) { 00332 for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) { 00333 (*i)->first.stop(); 00334 } 00335 } 00336 00338 virtual void finishServices(void) { m_service_pool.clear(); } 00339 00340 00342 struct ServicePair { 00343 ServicePair(void) : first(), second(first) {} 00344 boost::asio::io_service first; 00345 boost::asio::deadline_timer second; 00346 }; 00347 00349 typedef std::vector<boost::shared_ptr<ServicePair> > ServicePool; 00350 00351 00353 ServicePool m_service_pool; 00354 00356 boost::uint32_t m_next_service; 00357 }; 00358 00359 00360 } // end namespace pion 00361 00362 #endif