pion-net  4.0.9
common/include/pion/PionLockFreeQueue.hpp
00001 // -----------------------------------------------------------------------
00002 // pion-common: a collection of common libraries used by the Pion Platform
00003 // -----------------------------------------------------------------------
00004 // Copyright (C) 2007-2008 Atomic Labs, Inc.  (http://www.atomiclabs.com)
00005 //
00006 // Distributed under the Boost Software License, Version 1.0.
00007 // See http://www.boost.org/LICENSE_1_0.txt
00008 //
00009 
00010 #ifndef __PION_PIONLOCKFREEQUEUE_HEADER__
00011 #define __PION_PIONLOCKFREEQUEUE_HEADER__
00012 
00013 #ifndef PION_HAVE_LOCKFREE
00014     #error "PionLockFreeQueue requires the boost::lockfree library!"
00015 #endif
00016 #ifdef _MSC_VER
00017     #include <iso646.h>
00018     #pragma warning(push)
00019     #pragma warning(disable: 4800) // forcing value to bool 'true' or 'false' (performance warning)
00020 #endif
00021 #include <boost/lockfree/detail/tagged_ptr.hpp>
00022 #ifdef _MSC_VER
00023     #pragma warning(pop)
00024 #endif
00025 #include <boost/lockfree/detail/cas.hpp>
00026 #include <boost/lockfree/detail/freelist.hpp>
00027 #include <boost/lockfree/detail/branch_hints.hpp>
00028 #include <boost/detail/atomic_count.hpp>
00029 #include <boost/noncopyable.hpp>
00030 #include <boost/thread/thread.hpp>
00031 #include <pion/PionConfig.hpp>
00032 //#include <boost/array.hpp>
00033 //#include <boost/cstdint.hpp>
00034 //#include <boost/static_assert.hpp>
00035 
00036 
00037 // NOTE: the data structures contained in this file are based upon algorithms
00038 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking
00039 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott,
00040 // Department of Computer Science, University of Rochester).
00041 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
00042 
00043 
00044 namespace pion {    // begin namespace pion
00045 
00046 
00050 template <typename T>
00051 class PionLockFreeQueue :
00052     private boost::noncopyable
00053 {
00054 protected:
00055     
00057     struct QueueNode {
00059         QueueNode(void) : next(NULL) {}
00060         
00062         QueueNode(const T& d) : next(NULL), data(d) {}
00063         
00065         boost::lockfree::tagged_ptr<QueueNode>  next;
00066 
00068         T   data;
00069     };
00070     
00072     typedef boost::lockfree::tagged_ptr<QueueNode>  QueueNodePtr;
00073     
00075     inline QueueNode *createNode(void) {
00076         QueueNode *node_ptr = m_free_list.allocate();
00077         new(node_ptr) QueueNode();
00078         return node_ptr;
00079     }
00080     
00082     inline void destroyNode(QueueNode *node_ptr) {
00083         node_ptr->~QueueNode();
00084         m_free_list.deallocate(node_ptr);
00085     }
00086     
00087     
00088 public:
00089     
00091     PionLockFreeQueue(void) : m_size(0) {
00092         // initialize with a dummy node since m_head_ptr is always 
00093         // pointing to the item before the head of the list
00094         QueueNode *dummy_ptr = createNode();
00095         m_head_ptr.set_ptr(dummy_ptr);
00096         m_tail_ptr.set_ptr(dummy_ptr);
00097     }
00098     
00100     virtual ~PionLockFreeQueue() {
00101         clear();
00102         destroyNode(m_head_ptr.get_ptr());
00103     }
00104     
00106     inline bool empty(void) const {
00107         return (m_head_ptr.get_ptr() == m_tail_ptr.get_ptr());
00108     }
00109     
00111     inline std::size_t size(void) const {
00112         return m_size;
00113     }
00114     
00117     volatile void clear(void) {
00118         while (! empty()) {
00119             QueueNodePtr node_ptr(m_head_ptr);
00120             m_head_ptr = m_head_ptr->next;
00121             destroyNode(node_ptr.get_ptr());
00122             --m_size;
00123         }
00124     }
00125     
00131     inline void push(const T& t) {
00132         // create a new list node for the queue item
00133         QueueNode *node_ptr = createNode();
00134         node_ptr->data = t;
00135         
00136         while (true) {
00137             // get copy of tail pointer
00138             QueueNodePtr tail_ptr(m_tail_ptr);
00139             //boost::lockfree::memory_barrier();
00140 
00141             // get copy of tail's next pointer
00142             QueueNodePtr next_ptr(tail_ptr->next);
00143             boost::lockfree::memory_barrier();
00144             
00145             // make sure that the tail pointer has not changed since reading next
00146             if (boost::lockfree::likely(tail_ptr == m_tail_ptr)) {
00147                 // check if tail was pointing to the last node
00148                 if (next_ptr.get_ptr() == NULL) {
00149                     // try to link the new node at the end of the list
00150                     if (tail_ptr->next.cas(next_ptr, node_ptr)) {
00151                         // done with enqueue; try to swing tail to the inserted node
00152                         m_tail_ptr.cas(tail_ptr, node_ptr);
00153                         break;
00154                     }
00155                 } else {
00156                     // try to swing tail to the next node
00157                     m_tail_ptr.cas(tail_ptr, next_ptr.get_ptr());
00158                 }
00159             }
00160         }
00161 
00162         // increment size
00163         ++m_size;
00164     }   
00165     
00173     inline bool pop(T& t) {
00174         while (true) {
00175             // get copy of head pointer
00176             QueueNodePtr head_ptr(m_head_ptr);
00177             //boost::lockfree::memory_barrier();
00178 
00179             // get copy of tail pointer
00180             QueueNodePtr tail_ptr(m_tail_ptr);
00181             QueueNode *next_ptr = head_ptr->next.get_ptr();
00182             boost::lockfree::memory_barrier();
00183             
00184             // check consistency of head pointer
00185             if (boost::lockfree::likely(head_ptr == m_head_ptr)) {
00186 
00187                 // check if queue is empty, or tail is falling behind
00188                 if (head_ptr.get_ptr() == tail_ptr.get_ptr()) {
00189                     // is queue empty?
00190                     if (next_ptr == NULL)
00191                         return false;   // queue is empty
00192                     
00193                     // not empty; try to advance tail to catch it up
00194                     m_tail_ptr.cas(tail_ptr, next_ptr);
00195 
00196                 } else {
00197                     // tail is OK
00198                     // read value before CAS, otherwise another dequeue
00199                     //   might free the next node
00200                     t = next_ptr->data;
00201                     
00202                     // try to swing head to the next node
00203                     if (m_head_ptr.cas(head_ptr, next_ptr)) {
00204                         // success -> nuke the old head item
00205                         destroyNode(head_ptr.get_ptr());
00206                         break;  // exit loop
00207                     }
00208                 }
00209             }
00210         }
00211         
00212         // decrement size
00213         --m_size;
00214 
00215         // item successfully retrieved
00216         return true;
00217     }
00218 
00219     
00220 private:
00221     
00223     typedef boost::lockfree::caching_freelist<QueueNode>    NodeFreeList;
00224 
00225     
00227     boost::detail::atomic_count     m_size;
00228 
00230     NodeFreeList        m_free_list;
00231     
00233     QueueNodePtr        m_head_ptr;
00234     
00236 #ifdef _MSC_VER
00237     #pragma pack(8) /* force head_ and tail_ to different cache lines! */
00238     QueueNodePtr        m_tail_ptr;
00239 #else
00240     QueueNodePtr        m_tail_ptr __attribute__((aligned(64))); /* force head_ and tail_ to different cache lines! */
00241 #endif
00242 };
00243 
00244 
00245 
00246     
00247 #if 0
00248 
00249 
00250 
00251 
00252 template <typename T,
00253     boost::uint16_t MaxSize = 50000,
00254     boost::uint32_t SleepMilliSec = 10 >
00255 class PionLockFreeQueue :
00256     private boost::noncopyable
00257 {
00258 protected:
00259 
00261     BOOST_STATIC_ASSERT(sizeof(boost::uint32_t) >= (sizeof(boost::uint16_t) * 2));
00262 
00264     union QueueNodePtr {
00266         struct {
00268             boost::uint16_t     index;
00270             boost::uint16_t     counter;
00271         } data;
00273         boost::int32_t      value;
00274     };  
00275 
00277     struct QueueNode {
00279         QueueNode(void) { m_next.value = 0; }
00281         T                           m_data;
00283         volatile QueueNodePtr       m_next;
00284     };
00285     
00293     inline QueueNode& getQueueNode(QueueNodePtr node_ptr) {
00294         return m_nodes[node_ptr.data.index];
00295     }
00296     
00306     static inline bool cas(volatile QueueNodePtr& cur_ptr, QueueNodePtr old_ptr,
00307         boost::uint16_t new_index)
00308     {
00309         QueueNodePtr new_ptr;
00310         new_ptr.data.index = new_index;
00311         new_ptr.data.counter = old_ptr.data.counter + 1;
00312         return boost::lockfree::cas(&(cur_ptr.value), old_ptr.value, new_ptr.value);
00313     }
00314     
00316     inline boost::uint16_t acquireNode(void) {
00317         QueueNodePtr    current_free_ptr;
00318         boost::uint16_t new_free_index;
00319         boost::uint16_t avail_index;
00320 
00321         while (true) {
00322             while (true) {
00323                 // get current free_ptr value
00324                 current_free_ptr.value = m_free_ptr.value;
00325                 // check if current free_ptr value == 0
00326                 if (current_free_ptr.data.index > 0)
00327                     break;
00328                 // sleep while MaxSize is exceeded
00329                 boost::system_time wakeup_time = boost::get_system_time()
00330                     + boost::posix_time::millisec(SleepMilliSec);
00331                 boost::thread::sleep(wakeup_time);
00332             }
00333 
00334             // prepare what will become the new free_ptr index value
00335             new_free_index = current_free_ptr.data.index - 1;
00336             
00337             // optimisticly get the next available node index
00338             avail_index = m_free_nodes[new_free_index];
00339 
00340             // use cas operation to update free_ptr value
00341             if (avail_index != 0
00342                 && cas(m_free_ptr, current_free_ptr, new_free_index))
00343             {
00344                 m_free_nodes[new_free_index] = 0;
00345                 break;  // cas successful - all done!
00346             }
00347         }
00348         
00349         return avail_index;
00350     }
00351 
00353     inline void releaseNode(const boost::uint16_t node_index) {
00354         QueueNodePtr    current_free_ptr;
00355         boost::uint16_t new_free_index;
00356 
00357         while (true) {
00358             // get current free_ptr value
00359             current_free_ptr.value = m_free_ptr.value;
00360 
00361             // prepare what will become the new free_ptr index value
00362             new_free_index = current_free_ptr.data.index + 1;
00363 
00364             // use cas operation to update free_ptr value
00365             if (m_free_nodes[current_free_ptr.data.index] == 0
00366                 && cas(m_free_ptr, current_free_ptr, new_free_index))
00367             {
00368                 // push the available index value into the next free position
00369                 m_free_nodes[current_free_ptr.data.index] = node_index;
00370                 
00371                 // all done!
00372                 break;
00373             }
00374         }
00375     }
00376     
00377 
00378 public:
00379 
00381     virtual ~PionLockFreeQueue() {}
00382 
00384     PionLockFreeQueue(void)
00385     {
00386         // point head and tail to the node at index 1 (0 is reserved for NULL)
00387         m_head_ptr.data.index = m_tail_ptr.data.index = 1;
00388         m_head_ptr.data.counter = m_tail_ptr.data.counter = 0;
00389         // initialize free_ptr to zero
00390         m_free_ptr.value = 0;
00391         // initialize free_nodes to zero
00392         for (boost::uint16_t n = 0; n < MaxSize; ++n)
00393             m_free_nodes[n] = 0;
00394         // initialize next values to zero
00395         for (boost::uint16_t n = 0; n < MaxSize+2; ++n)
00396             m_nodes[n].m_next.value = 0;
00397         // push everything but the first two nodes into the available stack
00398         for (boost::uint16_t n = 2; n < MaxSize+2; ++n)
00399             releaseNode(n);
00400     }
00401     
00403     inline bool empty(void) const { return m_free_ptr.data.index == 0; }
00404 
00406     inline boost::uint16_t size(void) const { return m_free_ptr.data.index; }
00407     
00413     inline void push(const T& t) {
00414         // retrieve a new list node for the queue item
00415         const boost::uint16_t node_index(acquireNode());
00416 
00417         // prepare it to be added to the list
00418         QueueNode& node_ref = m_nodes[node_index];
00419         node_ref.m_data = t;
00420         node_ref.m_next.data.index = 0;
00421 
00422         // append node to the end of the list
00423         QueueNodePtr tail_ptr;
00424         QueueNodePtr next_ptr;
00425         while (true) {
00426             tail_ptr.value = m_tail_ptr.value;
00427             next_ptr.value = getQueueNode(tail_ptr).m_next.value;
00428             // make sure that the tail pointer has not changed since reading next
00429             if (tail_ptr.value == m_tail_ptr.value) {
00430                 // check if tail was pointing to the last node
00431                 if (next_ptr.data.index == 0) {
00432                     // try to link the new node at the end of the list
00433                     if (cas(getQueueNode(tail_ptr).m_next, next_ptr, node_index))
00434                         break;
00435                 } else {
00436                     // try to swing tail to the next node
00437                     cas(m_tail_ptr, tail_ptr, next_ptr.data.index);
00438                 }
00439             }
00440         }
00441         
00442         // done with enqueue; try to swing tail to the inserted node
00443         cas(m_tail_ptr, tail_ptr, node_index);
00444     }
00445     
00453     inline bool pop(T& t) {
00454         QueueNodePtr head_ptr;
00455         QueueNodePtr tail_ptr;
00456         QueueNodePtr next_ptr;
00457 
00458         while (true) {
00459             // read current pointer values
00460             head_ptr.value = m_head_ptr.value;
00461             tail_ptr.value = m_tail_ptr.value;
00462             next_ptr.value = getQueueNode(head_ptr).m_next.value;
00463             // check consistency
00464             if (head_ptr.value == m_head_ptr.value) {
00465                 // check if queue is empty, or tail is falling behind
00466                 if (head_ptr.data.index == tail_ptr.data.index) {
00467                     // is queue empty?
00468                     if (next_ptr.data.index == 0)
00469                         return false;
00470                     // not empty; try to advance tail to catch it up
00471                     cas(m_tail_ptr, tail_ptr, next_ptr.data.index);
00472                 } else {
00473                     // tail is OK
00474                     // read value before CAS, otherwise another dequeue might
00475                     // free the next node
00476                     t = getQueueNode(next_ptr).m_data;
00477                     // try to swing head to the next node
00478                     if (cas(m_head_ptr, head_ptr, next_ptr.data.index))
00479                         break;  // success -> exit loop
00480                 }
00481             }
00482         }
00483 
00484         // item successfully retrieved
00485         releaseNode(const_cast<boost::uint16_t&>(head_ptr.data.index));
00486         return true;
00487     }
00488 
00489 
00490 private:
00491     
00493     boost::array<QueueNode, MaxSize+2>      m_nodes;
00494     
00496     boost::array<volatile boost::uint16_t, MaxSize> m_free_nodes;
00497     
00499     volatile QueueNodePtr                   m_head_ptr;
00500 
00502     volatile QueueNodePtr                   m_tail_ptr;
00503 
00505     volatile QueueNodePtr                   m_free_ptr;
00506 };
00507 #endif
00508 
00509 }   // end namespace pion
00510 
00511 #endif