pion-net
4.0.9
|
00001 // ----------------------------------------------------------------------- 00002 // pion-common: a collection of common libraries used by the Pion Platform 00003 // ----------------------------------------------------------------------- 00004 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com) 00005 // 00006 // Distributed under the Boost Software License, Version 1.0. 00007 // See http://www.boost.org/LICENSE_1_0.txt 00008 // 00009 00010 #ifndef __PION_PIONLOCKFREEQUEUE_HEADER__ 00011 #define __PION_PIONLOCKFREEQUEUE_HEADER__ 00012 00013 #ifndef PION_HAVE_LOCKFREE 00014 #error "PionLockFreeQueue requires the boost::lockfree library!" 00015 #endif 00016 #ifdef _MSC_VER 00017 #include <iso646.h> 00018 #pragma warning(push) 00019 #pragma warning(disable: 4800) // forcing value to bool 'true' or 'false' (performance warning) 00020 #endif 00021 #include <boost/lockfree/detail/tagged_ptr.hpp> 00022 #ifdef _MSC_VER 00023 #pragma warning(pop) 00024 #endif 00025 #include <boost/lockfree/detail/cas.hpp> 00026 #include <boost/lockfree/detail/freelist.hpp> 00027 #include <boost/lockfree/detail/branch_hints.hpp> 00028 #include <boost/detail/atomic_count.hpp> 00029 #include <boost/noncopyable.hpp> 00030 #include <boost/thread/thread.hpp> 00031 #include <pion/PionConfig.hpp> 00032 //#include <boost/array.hpp> 00033 //#include <boost/cstdint.hpp> 00034 //#include <boost/static_assert.hpp> 00035 00036 00037 // NOTE: the data structures contained in this file are based upon algorithms 00038 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking 00039 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott, 00040 // Department of Computer Science, University of Rochester). 00041 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf 00042 00043 00044 namespace pion { // begin namespace pion 00045 00046 00050 template <typename T> 00051 class PionLockFreeQueue : 00052 private boost::noncopyable 00053 { 00054 protected: 00055 00057 struct QueueNode { 00059 QueueNode(void) : next(NULL) {} 00060 00062 QueueNode(const T& d) : next(NULL), data(d) {} 00063 00065 boost::lockfree::tagged_ptr<QueueNode> next; 00066 00068 T data; 00069 }; 00070 00072 typedef boost::lockfree::tagged_ptr<QueueNode> QueueNodePtr; 00073 00075 inline QueueNode *createNode(void) { 00076 QueueNode *node_ptr = m_free_list.allocate(); 00077 new(node_ptr) QueueNode(); 00078 return node_ptr; 00079 } 00080 00082 inline void destroyNode(QueueNode *node_ptr) { 00083 node_ptr->~QueueNode(); 00084 m_free_list.deallocate(node_ptr); 00085 } 00086 00087 00088 public: 00089 00091 PionLockFreeQueue(void) : m_size(0) { 00092 // initialize with a dummy node since m_head_ptr is always 00093 // pointing to the item before the head of the list 00094 QueueNode *dummy_ptr = createNode(); 00095 m_head_ptr.set_ptr(dummy_ptr); 00096 m_tail_ptr.set_ptr(dummy_ptr); 00097 } 00098 00100 virtual ~PionLockFreeQueue() { 00101 clear(); 00102 destroyNode(m_head_ptr.get_ptr()); 00103 } 00104 00106 inline bool empty(void) const { 00107 return (m_head_ptr.get_ptr() == m_tail_ptr.get_ptr()); 00108 } 00109 00111 inline std::size_t size(void) const { 00112 return m_size; 00113 } 00114 00117 volatile void clear(void) { 00118 while (! empty()) { 00119 QueueNodePtr node_ptr(m_head_ptr); 00120 m_head_ptr = m_head_ptr->next; 00121 destroyNode(node_ptr.get_ptr()); 00122 --m_size; 00123 } 00124 } 00125 00131 inline void push(const T& t) { 00132 // create a new list node for the queue item 00133 QueueNode *node_ptr = createNode(); 00134 node_ptr->data = t; 00135 00136 while (true) { 00137 // get copy of tail pointer 00138 QueueNodePtr tail_ptr(m_tail_ptr); 00139 //boost::lockfree::memory_barrier(); 00140 00141 // get copy of tail's next pointer 00142 QueueNodePtr next_ptr(tail_ptr->next); 00143 boost::lockfree::memory_barrier(); 00144 00145 // make sure that the tail pointer has not changed since reading next 00146 if (boost::lockfree::likely(tail_ptr == m_tail_ptr)) { 00147 // check if tail was pointing to the last node 00148 if (next_ptr.get_ptr() == NULL) { 00149 // try to link the new node at the end of the list 00150 if (tail_ptr->next.cas(next_ptr, node_ptr)) { 00151 // done with enqueue; try to swing tail to the inserted node 00152 m_tail_ptr.cas(tail_ptr, node_ptr); 00153 break; 00154 } 00155 } else { 00156 // try to swing tail to the next node 00157 m_tail_ptr.cas(tail_ptr, next_ptr.get_ptr()); 00158 } 00159 } 00160 } 00161 00162 // increment size 00163 ++m_size; 00164 } 00165 00173 inline bool pop(T& t) { 00174 while (true) { 00175 // get copy of head pointer 00176 QueueNodePtr head_ptr(m_head_ptr); 00177 //boost::lockfree::memory_barrier(); 00178 00179 // get copy of tail pointer 00180 QueueNodePtr tail_ptr(m_tail_ptr); 00181 QueueNode *next_ptr = head_ptr->next.get_ptr(); 00182 boost::lockfree::memory_barrier(); 00183 00184 // check consistency of head pointer 00185 if (boost::lockfree::likely(head_ptr == m_head_ptr)) { 00186 00187 // check if queue is empty, or tail is falling behind 00188 if (head_ptr.get_ptr() == tail_ptr.get_ptr()) { 00189 // is queue empty? 00190 if (next_ptr == NULL) 00191 return false; // queue is empty 00192 00193 // not empty; try to advance tail to catch it up 00194 m_tail_ptr.cas(tail_ptr, next_ptr); 00195 00196 } else { 00197 // tail is OK 00198 // read value before CAS, otherwise another dequeue 00199 // might free the next node 00200 t = next_ptr->data; 00201 00202 // try to swing head to the next node 00203 if (m_head_ptr.cas(head_ptr, next_ptr)) { 00204 // success -> nuke the old head item 00205 destroyNode(head_ptr.get_ptr()); 00206 break; // exit loop 00207 } 00208 } 00209 } 00210 } 00211 00212 // decrement size 00213 --m_size; 00214 00215 // item successfully retrieved 00216 return true; 00217 } 00218 00219 00220 private: 00221 00223 typedef boost::lockfree::caching_freelist<QueueNode> NodeFreeList; 00224 00225 00227 boost::detail::atomic_count m_size; 00228 00230 NodeFreeList m_free_list; 00231 00233 QueueNodePtr m_head_ptr; 00234 00236 #ifdef _MSC_VER 00237 #pragma pack(8) /* force head_ and tail_ to different cache lines! */ 00238 QueueNodePtr m_tail_ptr; 00239 #else 00240 QueueNodePtr m_tail_ptr __attribute__((aligned(64))); /* force head_ and tail_ to different cache lines! */ 00241 #endif 00242 }; 00243 00244 00245 00246 00247 #if 0 00248 00249 00250 00251 00252 template <typename T, 00253 boost::uint16_t MaxSize = 50000, 00254 boost::uint32_t SleepMilliSec = 10 > 00255 class PionLockFreeQueue : 00256 private boost::noncopyable 00257 { 00258 protected: 00259 00261 BOOST_STATIC_ASSERT(sizeof(boost::uint32_t) >= (sizeof(boost::uint16_t) * 2)); 00262 00264 union QueueNodePtr { 00266 struct { 00268 boost::uint16_t index; 00270 boost::uint16_t counter; 00271 } data; 00273 boost::int32_t value; 00274 }; 00275 00277 struct QueueNode { 00279 QueueNode(void) { m_next.value = 0; } 00281 T m_data; 00283 volatile QueueNodePtr m_next; 00284 }; 00285 00293 inline QueueNode& getQueueNode(QueueNodePtr node_ptr) { 00294 return m_nodes[node_ptr.data.index]; 00295 } 00296 00306 static inline bool cas(volatile QueueNodePtr& cur_ptr, QueueNodePtr old_ptr, 00307 boost::uint16_t new_index) 00308 { 00309 QueueNodePtr new_ptr; 00310 new_ptr.data.index = new_index; 00311 new_ptr.data.counter = old_ptr.data.counter + 1; 00312 return boost::lockfree::cas(&(cur_ptr.value), old_ptr.value, new_ptr.value); 00313 } 00314 00316 inline boost::uint16_t acquireNode(void) { 00317 QueueNodePtr current_free_ptr; 00318 boost::uint16_t new_free_index; 00319 boost::uint16_t avail_index; 00320 00321 while (true) { 00322 while (true) { 00323 // get current free_ptr value 00324 current_free_ptr.value = m_free_ptr.value; 00325 // check if current free_ptr value == 0 00326 if (current_free_ptr.data.index > 0) 00327 break; 00328 // sleep while MaxSize is exceeded 00329 boost::system_time wakeup_time = boost::get_system_time() 00330 + boost::posix_time::millisec(SleepMilliSec); 00331 boost::thread::sleep(wakeup_time); 00332 } 00333 00334 // prepare what will become the new free_ptr index value 00335 new_free_index = current_free_ptr.data.index - 1; 00336 00337 // optimisticly get the next available node index 00338 avail_index = m_free_nodes[new_free_index]; 00339 00340 // use cas operation to update free_ptr value 00341 if (avail_index != 0 00342 && cas(m_free_ptr, current_free_ptr, new_free_index)) 00343 { 00344 m_free_nodes[new_free_index] = 0; 00345 break; // cas successful - all done! 00346 } 00347 } 00348 00349 return avail_index; 00350 } 00351 00353 inline void releaseNode(const boost::uint16_t node_index) { 00354 QueueNodePtr current_free_ptr; 00355 boost::uint16_t new_free_index; 00356 00357 while (true) { 00358 // get current free_ptr value 00359 current_free_ptr.value = m_free_ptr.value; 00360 00361 // prepare what will become the new free_ptr index value 00362 new_free_index = current_free_ptr.data.index + 1; 00363 00364 // use cas operation to update free_ptr value 00365 if (m_free_nodes[current_free_ptr.data.index] == 0 00366 && cas(m_free_ptr, current_free_ptr, new_free_index)) 00367 { 00368 // push the available index value into the next free position 00369 m_free_nodes[current_free_ptr.data.index] = node_index; 00370 00371 // all done! 00372 break; 00373 } 00374 } 00375 } 00376 00377 00378 public: 00379 00381 virtual ~PionLockFreeQueue() {} 00382 00384 PionLockFreeQueue(void) 00385 { 00386 // point head and tail to the node at index 1 (0 is reserved for NULL) 00387 m_head_ptr.data.index = m_tail_ptr.data.index = 1; 00388 m_head_ptr.data.counter = m_tail_ptr.data.counter = 0; 00389 // initialize free_ptr to zero 00390 m_free_ptr.value = 0; 00391 // initialize free_nodes to zero 00392 for (boost::uint16_t n = 0; n < MaxSize; ++n) 00393 m_free_nodes[n] = 0; 00394 // initialize next values to zero 00395 for (boost::uint16_t n = 0; n < MaxSize+2; ++n) 00396 m_nodes[n].m_next.value = 0; 00397 // push everything but the first two nodes into the available stack 00398 for (boost::uint16_t n = 2; n < MaxSize+2; ++n) 00399 releaseNode(n); 00400 } 00401 00403 inline bool empty(void) const { return m_free_ptr.data.index == 0; } 00404 00406 inline boost::uint16_t size(void) const { return m_free_ptr.data.index; } 00407 00413 inline void push(const T& t) { 00414 // retrieve a new list node for the queue item 00415 const boost::uint16_t node_index(acquireNode()); 00416 00417 // prepare it to be added to the list 00418 QueueNode& node_ref = m_nodes[node_index]; 00419 node_ref.m_data = t; 00420 node_ref.m_next.data.index = 0; 00421 00422 // append node to the end of the list 00423 QueueNodePtr tail_ptr; 00424 QueueNodePtr next_ptr; 00425 while (true) { 00426 tail_ptr.value = m_tail_ptr.value; 00427 next_ptr.value = getQueueNode(tail_ptr).m_next.value; 00428 // make sure that the tail pointer has not changed since reading next 00429 if (tail_ptr.value == m_tail_ptr.value) { 00430 // check if tail was pointing to the last node 00431 if (next_ptr.data.index == 0) { 00432 // try to link the new node at the end of the list 00433 if (cas(getQueueNode(tail_ptr).m_next, next_ptr, node_index)) 00434 break; 00435 } else { 00436 // try to swing tail to the next node 00437 cas(m_tail_ptr, tail_ptr, next_ptr.data.index); 00438 } 00439 } 00440 } 00441 00442 // done with enqueue; try to swing tail to the inserted node 00443 cas(m_tail_ptr, tail_ptr, node_index); 00444 } 00445 00453 inline bool pop(T& t) { 00454 QueueNodePtr head_ptr; 00455 QueueNodePtr tail_ptr; 00456 QueueNodePtr next_ptr; 00457 00458 while (true) { 00459 // read current pointer values 00460 head_ptr.value = m_head_ptr.value; 00461 tail_ptr.value = m_tail_ptr.value; 00462 next_ptr.value = getQueueNode(head_ptr).m_next.value; 00463 // check consistency 00464 if (head_ptr.value == m_head_ptr.value) { 00465 // check if queue is empty, or tail is falling behind 00466 if (head_ptr.data.index == tail_ptr.data.index) { 00467 // is queue empty? 00468 if (next_ptr.data.index == 0) 00469 return false; 00470 // not empty; try to advance tail to catch it up 00471 cas(m_tail_ptr, tail_ptr, next_ptr.data.index); 00472 } else { 00473 // tail is OK 00474 // read value before CAS, otherwise another dequeue might 00475 // free the next node 00476 t = getQueueNode(next_ptr).m_data; 00477 // try to swing head to the next node 00478 if (cas(m_head_ptr, head_ptr, next_ptr.data.index)) 00479 break; // success -> exit loop 00480 } 00481 } 00482 } 00483 00484 // item successfully retrieved 00485 releaseNode(const_cast<boost::uint16_t&>(head_ptr.data.index)); 00486 return true; 00487 } 00488 00489 00490 private: 00491 00493 boost::array<QueueNode, MaxSize+2> m_nodes; 00494 00496 boost::array<volatile boost::uint16_t, MaxSize> m_free_nodes; 00497 00499 volatile QueueNodePtr m_head_ptr; 00500 00502 volatile QueueNodePtr m_tail_ptr; 00503 00505 volatile QueueNodePtr m_free_ptr; 00506 }; 00507 #endif 00508 00509 } // end namespace pion 00510 00511 #endif