pion-net  4.0.9
common/include/pion/PionLockedQueue.hpp
00001 // -----------------------------------------------------------------------
00002 // pion-common: a collection of common libraries used by the Pion Platform
00003 // -----------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Distributed under the Boost Software License, Version 1.0.
00007 // See http://www.boost.org/LICENSE_1_0.txt
00008 //
00009 
00010 #ifndef __PION_PIONLOCKEDQUEUE_HEADER__
00011 #define __PION_PIONLOCKEDQUEUE_HEADER__
00012 
00013 #include <new>
00014 #include <boost/cstdint.hpp>
00015 #include <boost/noncopyable.hpp>
00016 #include <boost/thread/thread.hpp>
00017 #include <boost/thread/mutex.hpp>
00018 #include <boost/thread/condition.hpp>
00019 #include <boost/detail/atomic_count.hpp>
00020 #include <pion/PionConfig.hpp>
00021 #include <pion/PionException.hpp>
00022 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
00023     #include <boost/lockfree/detail/freelist.hpp>
00024 #endif
00025 
00026 
00027 // NOTE: the data structures contained in this file are based upon algorithms
00028 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking
00029 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott,
00030 // Department of Computer Science, University of Rochester).
00031 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
00032 
00033 
00034 namespace pion {    // begin namespace pion
00035 
00036 
00040 template <typename T,
00041     boost::uint32_t MaxSize = 250000,
00042     boost::uint32_t SleepMilliSec = 10 >
00043 class PionLockedQueue :
00044     private boost::noncopyable
00045 {
00046 protected:
00047 
00049     struct QueueNode {
00050         T       data;       //< data wrapped by the node item
00051         QueueNode * next;       //< points to the next node in the queue
00052         boost::uint32_t version;    //< the node item's version number
00053     };
00054 
00056     inline QueueNode *createNode(void) {
00057 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
00058         return new (m_free_list.allocate()) QueueNode();
00059 #else
00060         return new QueueNode();
00061 #endif
00062     }
00063 
00065     inline void destroyNode(QueueNode *node_ptr) {
00066 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
00067         node_ptr->~QueueNode();
00068         m_free_list.deallocate(node_ptr);
00069 #else
00070         delete node_ptr;
00071 #endif
00072     }
00073     
00075     inline void initialize(void) {
00076         // initialize with a dummy node since m_head_ptr is always 
00077         // pointing to the item before the head of the list
00078         m_head_ptr = m_tail_ptr = createNode();
00079         m_head_ptr->next = NULL;
00080         m_head_ptr->version = 0;
00081     }
00082     
00091     inline bool dequeue(T& t, boost::uint32_t& version) {
00092         // just return if the list is empty
00093         boost::mutex::scoped_lock head_lock(m_head_mutex);
00094         QueueNode *new_head_ptr = m_head_ptr->next;
00095         if (! new_head_ptr) {
00096             version = m_head_ptr->version;
00097             return false;
00098         }
00099 
00100         // get a copy of the item at the head of the list
00101         version = new_head_ptr->version;
00102         t = new_head_ptr->data;
00103 
00104         // update the pointer to the head of the list
00105         QueueNode *old_head_ptr = m_head_ptr;
00106         m_head_ptr = new_head_ptr;
00107         head_lock.unlock();
00108 
00109         // free the QueueNode for the old head of the list
00110         destroyNode(old_head_ptr);
00111 
00112         // decrement size
00113         --m_size;
00114 
00115         // item successfully dequeued
00116         return true;
00117     }
00118 
00119 
00120 public:
00121 
00123     class ConsumerThread {
00124     public:
00125 
00130         ConsumerThread(void) : m_is_running(true), m_next_ptr(NULL),
00131             m_wakeup_time(boost::posix_time::not_a_date_time) {}
00132 
00139         template <typename DurationType>
00140         ConsumerThread(const DurationType& d)
00141             : m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d)
00142         {}
00143 
00145         inline bool isRunning(void) const { return m_is_running; }
00146 
00148         inline void stop(void) { m_is_running = false; m_wakeup_event.notify_one(); }
00149 
00151         inline void reset(void) { m_is_running = true; m_next_ptr = NULL; }
00152         
00154         inline bool hasWakeupTimer(void) const { return !m_wakeup_time.is_not_a_date_time(); }
00155         
00157         inline const boost::posix_time::time_duration& getWakeupTimer(void) const {
00158             return m_wakeup_time;
00159         }
00160 
00161     private:
00162 
00164         friend class PionLockedQueue;
00165 
00166         volatile bool       m_is_running;       //< true while the thread is running/active
00167         ConsumerThread *    m_next_ptr;     //< pointer to the next idle thread
00168         boost::condition    m_wakeup_event; //< triggered when a new item is available
00169         boost::posix_time::time_duration    m_wakeup_time;  //< inactivity wakeup timer duration
00170     };
00171 
00172 
00174     PionLockedQueue(void)
00175         : m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL),
00176         m_next_version(1), m_size(0)
00177     {
00178         initialize();
00179     }
00180     
00182     virtual ~PionLockedQueue() {
00183         clear();
00184         destroyNode(m_tail_ptr);
00185     }
00186     
00188     inline bool empty(void) const { return (m_head_ptr->next == NULL); }
00189     
00191     std::size_t size(void) const {
00192         return m_size;
00193     }
00194     
00196     void clear(void) {
00197         boost::mutex::scoped_lock tail_lock(m_tail_mutex);
00198         boost::mutex::scoped_lock head_lock(m_head_mutex);
00199         // also delete dummy node and reinitialize it to clear old value
00200         while (m_head_ptr) {
00201             m_tail_ptr = m_head_ptr;
00202             m_head_ptr = m_head_ptr->next;
00203             destroyNode(m_tail_ptr);
00204             if (m_head_ptr)
00205                 --m_size;
00206         }
00207         initialize();
00208     }
00209 
00215     void push(const T& t) {
00216         // sleep while MaxSize is exceeded
00217         if (MaxSize > 0) {
00218             boost::system_time wakeup_time;
00219             while (size() >= MaxSize) {
00220                 wakeup_time = boost::get_system_time()
00221                     + boost::posix_time::millisec(SleepMilliSec);
00222                 boost::thread::sleep(wakeup_time);
00223             }
00224         }
00225 
00226         // create a new list node for the queue item
00227         QueueNode *node_ptr = createNode();
00228         node_ptr->data = t;
00229         node_ptr->next = NULL;
00230         node_ptr->version = 0;
00231 
00232         // append node to the end of the list
00233         boost::mutex::scoped_lock tail_lock(m_tail_mutex);
00234         node_ptr->version = (m_next_version += 2);
00235         m_tail_ptr->next = node_ptr;
00236 
00237         // update the tail pointer for the new node
00238         m_tail_ptr = node_ptr;
00239 
00240         // increment size
00241         ++m_size;
00242 
00243         // wake up an idle thread (if any)
00244         if (m_idle_ptr) {
00245             ConsumerThread *idle_ptr = m_idle_ptr;
00246             m_idle_ptr = m_idle_ptr->m_next_ptr;
00247             idle_ptr->m_wakeup_event.notify_one();
00248         }
00249     }
00250     
00261     bool pop(T& t, ConsumerThread& thread_info) {
00262         boost::uint32_t last_known_version;
00263         while (thread_info.isRunning()) {
00264             // try to get the next value
00265             if ( dequeue(t, last_known_version) )
00266                 return true;    // got an item
00267 
00268             // queue is empty
00269             boost::mutex::scoped_lock tail_lock(m_tail_mutex);
00270             if (m_tail_ptr->version == last_known_version) {
00271                 // still empty after acquiring lock
00272                 thread_info.m_next_ptr = m_idle_ptr;
00273                 m_idle_ptr = & thread_info;
00274                 // get wakeup time (if any)
00275                 if (thread_info.hasWakeupTimer()) {
00276                     // wait for an item to become available
00277                     const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.getWakeupTimer());
00278                     if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time))
00279                         return false;   // timer expired if timed_wait() returns false
00280                 } else {
00281                     // wait for an item to become available
00282                     thread_info.m_wakeup_event.wait(tail_lock);
00283                 }
00284             }
00285         }
00286         return false;
00287     }
00288 
00296     inline bool pop(T& t) { boost::uint32_t version; return dequeue(t, version); }
00297     
00298 
00299 private:
00300 
00301 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
00302 
00303     boost::lockfree::caching_freelist<QueueNode>    m_free_list;
00304 #endif
00305     
00307     boost::mutex                    m_head_mutex;
00308 
00310     boost::mutex                    m_tail_mutex;
00311 
00313     QueueNode *                     m_head_ptr;
00314 
00316     QueueNode *                     m_tail_ptr;
00317 
00319     ConsumerThread *                m_idle_ptr;
00320 
00322     boost::uint32_t                 m_next_version;
00323     
00325     boost::detail::atomic_count     m_size;
00326 };
00327 
00328     
00329 }   // end namespace pion
00330 
00331 #endif