pion-net
4.0.9
|
00001 // ----------------------------------------------------------------------- 00002 // pion-common: a collection of common libraries used by the Pion Platform 00003 // ----------------------------------------------------------------------- 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #include <boost/date_time/posix_time/posix_time_duration.hpp> 00011 #include <pion/PionScheduler.hpp> 00012 00013 namespace pion { // begin namespace pion 00014 00015 00016 // static members of PionScheduler 00017 00018 const boost::uint32_t PionScheduler::DEFAULT_NUM_THREADS = 8; 00019 const boost::uint32_t PionScheduler::NSEC_IN_SECOND = 1000000000; // (10^9) 00020 const boost::uint32_t PionScheduler::MICROSEC_IN_SECOND = 1000000; // (10^6) 00021 const boost::uint32_t PionScheduler::KEEP_RUNNING_TIMER_SECONDS = 5; 00022 00023 00024 // PionScheduler member functions 00025 00026 void PionScheduler::shutdown(void) 00027 { 00028 // lock mutex for thread safety 00029 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00030 00031 if (m_is_running) { 00032 00033 PION_LOG_INFO(m_logger, "Shutting down the thread scheduler"); 00034 00035 while (m_active_users > 0) { 00036 // first, wait for any active users to exit 00037 PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish"); 00038 m_no_more_active_users.wait(scheduler_lock); 00039 } 00040 00041 // shut everything down 00042 m_is_running = false; 00043 stopServices(); 00044 stopThreads(); 00045 finishServices(); 00046 finishThreads(); 00047 00048 PION_LOG_INFO(m_logger, "The thread scheduler has shutdown"); 00049 00050 // Make sure anyone waiting on shutdown gets notified 00051 m_scheduler_has_stopped.notify_all(); 00052 00053 } else { 00054 00055 // stop and finish everything to be certain that no events are pending 00056 stopServices(); 00057 stopThreads(); 00058 finishServices(); 00059 finishThreads(); 00060 00061 // Make sure anyone waiting on shutdown gets notified 00062 // even if the scheduler did not startup successfully 00063 m_scheduler_has_stopped.notify_all(); 00064 } 00065 } 00066 00067 void PionScheduler::join(void) 00068 { 00069 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00070 while (m_is_running) { 00071 // sleep until scheduler_has_stopped condition is signaled 00072 m_scheduler_has_stopped.wait(scheduler_lock); 00073 } 00074 } 00075 00076 void PionScheduler::keepRunning(boost::asio::io_service& my_service, 00077 boost::asio::deadline_timer& my_timer) 00078 { 00079 if (m_is_running) { 00080 // schedule this again to make sure the service doesn't complete 00081 my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS)); 00082 my_timer.async_wait(boost::bind(&PionScheduler::keepRunning, this, 00083 boost::ref(my_service), boost::ref(my_timer))); 00084 } 00085 } 00086 00087 void PionScheduler::addActiveUser(void) 00088 { 00089 if (!m_is_running) startup(); 00090 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00091 ++m_active_users; 00092 } 00093 00094 void PionScheduler::removeActiveUser(void) 00095 { 00096 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00097 if (--m_active_users == 0) 00098 m_no_more_active_users.notify_all(); 00099 } 00100 00101 boost::xtime PionScheduler::getWakeupTime(boost::uint32_t sleep_sec, 00102 boost::uint32_t sleep_nsec) 00103 { 00104 boost::xtime wakeup_time; 00105 boost::xtime_get(&wakeup_time, boost::TIME_UTC); 00106 wakeup_time.sec += sleep_sec; 00107 wakeup_time.nsec += sleep_nsec; 00108 if (static_cast<boost::uint32_t>(wakeup_time.nsec) >= NSEC_IN_SECOND) { 00109 wakeup_time.sec++; 00110 wakeup_time.nsec -= NSEC_IN_SECOND; 00111 } 00112 return wakeup_time; 00113 } 00114 00115 void PionScheduler::processServiceWork(boost::asio::io_service& service) { 00116 while (m_is_running) { 00117 try { 00118 service.run(); 00119 } catch (std::exception& e) { 00120 PION_LOG_ERROR(m_logger, e.what()); 00121 } catch (...) { 00122 PION_LOG_ERROR(m_logger, "caught unrecognized exception"); 00123 } 00124 } 00125 } 00126 00127 00128 // PionSingleServiceScheduler member functions 00129 00130 void PionSingleServiceScheduler::startup(void) 00131 { 00132 // lock mutex for thread safety 00133 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00134 00135 if (! m_is_running) { 00136 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00137 m_is_running = true; 00138 00139 // schedule a work item to make sure that the service doesn't complete 00140 m_service.reset(); 00141 keepRunning(m_service, m_timer); 00142 00143 // start multiple threads to handle async tasks 00144 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00145 boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&PionScheduler::processServiceWork, 00146 this, boost::ref(m_service)) )); 00147 m_thread_pool.push_back(new_thread); 00148 } 00149 } 00150 } 00151 00152 00153 // PionOneToOneScheduler member functions 00154 00155 void PionOneToOneScheduler::startup(void) 00156 { 00157 // lock mutex for thread safety 00158 boost::mutex::scoped_lock scheduler_lock(m_mutex); 00159 00160 if (! m_is_running) { 00161 PION_LOG_INFO(m_logger, "Starting thread scheduler"); 00162 m_is_running = true; 00163 00164 // make sure there are enough services initialized 00165 while (m_service_pool.size() < m_num_threads) { 00166 boost::shared_ptr<ServicePair> service_ptr(new ServicePair()); 00167 m_service_pool.push_back(service_ptr); 00168 } 00169 00170 // schedule a work item for each service to make sure that it doesn't complete 00171 for (ServicePool::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) { 00172 keepRunning((*i)->first, (*i)->second); 00173 } 00174 00175 // start multiple threads to handle async tasks 00176 for (boost::uint32_t n = 0; n < m_num_threads; ++n) { 00177 boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&PionScheduler::processServiceWork, 00178 this, boost::ref(m_service_pool[n]->first)) )); 00179 m_thread_pool.push_back(new_thread); 00180 } 00181 } 00182 } 00183 00184 00185 } // end namespace pion