pion-net
4.0.9
|
00001 // ----------------------------------------------------------------------- 00002 // pion-common: a collection of common libraries used by the Pion Platform 00003 // ----------------------------------------------------------------------- 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #ifndef __PION_PIONLOCKEDQUEUE_HEADER__ 00011 #define __PION_PIONLOCKEDQUEUE_HEADER__ 00012 00013 #include <new> 00014 #include <boost/cstdint.hpp> 00015 #include <boost/noncopyable.hpp> 00016 #include <boost/thread/thread.hpp> 00017 #include <boost/thread/mutex.hpp> 00018 #include <boost/thread/condition.hpp> 00019 #include <boost/detail/atomic_count.hpp> 00020 #include <pion/PionConfig.hpp> 00021 #include <pion/PionException.hpp> 00022 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER) 00023 #include <boost/lockfree/detail/freelist.hpp> 00024 #endif 00025 00026 00027 // NOTE: the data structures contained in this file are based upon algorithms 00028 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking 00029 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott, 00030 // Department of Computer Science, University of Rochester). 00031 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf 00032 00033 00034 namespace pion { // begin namespace pion 00035 00036 00040 template <typename T, 00041 boost::uint32_t MaxSize = 250000, 00042 boost::uint32_t SleepMilliSec = 10 > 00043 class PionLockedQueue : 00044 private boost::noncopyable 00045 { 00046 protected: 00047 00049 struct QueueNode { 00050 T data; //< data wrapped by the node item 00051 QueueNode * next; //< points to the next node in the queue 00052 boost::uint32_t version; //< the node item's version number 00053 }; 00054 00056 inline QueueNode *createNode(void) { 00057 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER) 00058 return new (m_free_list.allocate()) QueueNode(); 00059 #else 00060 return new QueueNode(); 00061 #endif 00062 } 00063 00065 inline void destroyNode(QueueNode *node_ptr) { 00066 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER) 00067 node_ptr->~QueueNode(); 00068 m_free_list.deallocate(node_ptr); 00069 #else 00070 delete node_ptr; 00071 #endif 00072 } 00073 00075 inline void initialize(void) { 00076 // initialize with a dummy node since m_head_ptr is always 00077 // pointing to the item before the head of the list 00078 m_head_ptr = m_tail_ptr = createNode(); 00079 m_head_ptr->next = NULL; 00080 m_head_ptr->version = 0; 00081 } 00082 00091 inline bool dequeue(T& t, boost::uint32_t& version) { 00092 // just return if the list is empty 00093 boost::mutex::scoped_lock head_lock(m_head_mutex); 00094 QueueNode *new_head_ptr = m_head_ptr->next; 00095 if (! new_head_ptr) { 00096 version = m_head_ptr->version; 00097 return false; 00098 } 00099 00100 // get a copy of the item at the head of the list 00101 version = new_head_ptr->version; 00102 t = new_head_ptr->data; 00103 00104 // update the pointer to the head of the list 00105 QueueNode *old_head_ptr = m_head_ptr; 00106 m_head_ptr = new_head_ptr; 00107 head_lock.unlock(); 00108 00109 // free the QueueNode for the old head of the list 00110 destroyNode(old_head_ptr); 00111 00112 // decrement size 00113 --m_size; 00114 00115 // item successfully dequeued 00116 return true; 00117 } 00118 00119 00120 public: 00121 00123 class ConsumerThread { 00124 public: 00125 00130 ConsumerThread(void) : m_is_running(true), m_next_ptr(NULL), 00131 m_wakeup_time(boost::posix_time::not_a_date_time) {} 00132 00139 template <typename DurationType> 00140 ConsumerThread(const DurationType& d) 00141 : m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d) 00142 {} 00143 00145 inline bool isRunning(void) const { return m_is_running; } 00146 00148 inline void stop(void) { m_is_running = false; m_wakeup_event.notify_one(); } 00149 00151 inline void reset(void) { m_is_running = true; m_next_ptr = NULL; } 00152 00154 inline bool hasWakeupTimer(void) const { return !m_wakeup_time.is_not_a_date_time(); } 00155 00157 inline const boost::posix_time::time_duration& getWakeupTimer(void) const { 00158 return m_wakeup_time; 00159 } 00160 00161 private: 00162 00164 friend class PionLockedQueue; 00165 00166 volatile bool m_is_running; //< true while the thread is running/active 00167 ConsumerThread * m_next_ptr; //< pointer to the next idle thread 00168 boost::condition m_wakeup_event; //< triggered when a new item is available 00169 boost::posix_time::time_duration m_wakeup_time; //< inactivity wakeup timer duration 00170 }; 00171 00172 00174 PionLockedQueue(void) 00175 : m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL), 00176 m_next_version(1), m_size(0) 00177 { 00178 initialize(); 00179 } 00180 00182 virtual ~PionLockedQueue() { 00183 clear(); 00184 destroyNode(m_tail_ptr); 00185 } 00186 00188 inline bool empty(void) const { return (m_head_ptr->next == NULL); } 00189 00191 std::size_t size(void) const { 00192 return m_size; 00193 } 00194 00196 void clear(void) { 00197 boost::mutex::scoped_lock tail_lock(m_tail_mutex); 00198 boost::mutex::scoped_lock head_lock(m_head_mutex); 00199 // also delete dummy node and reinitialize it to clear old value 00200 while (m_head_ptr) { 00201 m_tail_ptr = m_head_ptr; 00202 m_head_ptr = m_head_ptr->next; 00203 destroyNode(m_tail_ptr); 00204 if (m_head_ptr) 00205 --m_size; 00206 } 00207 initialize(); 00208 } 00209 00215 void push(const T& t) { 00216 // sleep while MaxSize is exceeded 00217 if (MaxSize > 0) { 00218 boost::system_time wakeup_time; 00219 while (size() >= MaxSize) { 00220 wakeup_time = boost::get_system_time() 00221 + boost::posix_time::millisec(SleepMilliSec); 00222 boost::thread::sleep(wakeup_time); 00223 } 00224 } 00225 00226 // create a new list node for the queue item 00227 QueueNode *node_ptr = createNode(); 00228 node_ptr->data = t; 00229 node_ptr->next = NULL; 00230 node_ptr->version = 0; 00231 00232 // append node to the end of the list 00233 boost::mutex::scoped_lock tail_lock(m_tail_mutex); 00234 node_ptr->version = (m_next_version += 2); 00235 m_tail_ptr->next = node_ptr; 00236 00237 // update the tail pointer for the new node 00238 m_tail_ptr = node_ptr; 00239 00240 // increment size 00241 ++m_size; 00242 00243 // wake up an idle thread (if any) 00244 if (m_idle_ptr) { 00245 ConsumerThread *idle_ptr = m_idle_ptr; 00246 m_idle_ptr = m_idle_ptr->m_next_ptr; 00247 idle_ptr->m_wakeup_event.notify_one(); 00248 } 00249 } 00250 00261 bool pop(T& t, ConsumerThread& thread_info) { 00262 boost::uint32_t last_known_version; 00263 while (thread_info.isRunning()) { 00264 // try to get the next value 00265 if ( dequeue(t, last_known_version) ) 00266 return true; // got an item 00267 00268 // queue is empty 00269 boost::mutex::scoped_lock tail_lock(m_tail_mutex); 00270 if (m_tail_ptr->version == last_known_version) { 00271 // still empty after acquiring lock 00272 thread_info.m_next_ptr = m_idle_ptr; 00273 m_idle_ptr = & thread_info; 00274 // get wakeup time (if any) 00275 if (thread_info.hasWakeupTimer()) { 00276 // wait for an item to become available 00277 const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.getWakeupTimer()); 00278 if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time)) 00279 return false; // timer expired if timed_wait() returns false 00280 } else { 00281 // wait for an item to become available 00282 thread_info.m_wakeup_event.wait(tail_lock); 00283 } 00284 } 00285 } 00286 return false; 00287 } 00288 00296 inline bool pop(T& t) { boost::uint32_t version; return dequeue(t, version); } 00297 00298 00299 private: 00300 00301 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER) 00302 00303 boost::lockfree::caching_freelist<QueueNode> m_free_list; 00304 #endif 00305 00307 boost::mutex m_head_mutex; 00308 00310 boost::mutex m_tail_mutex; 00311 00313 QueueNode * m_head_ptr; 00314 00316 QueueNode * m_tail_ptr; 00317 00319 ConsumerThread * m_idle_ptr; 00320 00322 boost::uint32_t m_next_version; 00323 00325 boost::detail::atomic_count m_size; 00326 }; 00327 00328 00329 } // end namespace pion 00330 00331 #endif