Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * interruptible_barrier.cpp - Interruptible Barrier 00004 * 00005 * Created: Sat Jan 31 12:30:32 2009 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <core/threading/interruptible_barrier.h> 00025 #include <core/threading/thread_list.h> 00026 #include <core/exceptions/system.h> 00027 #include <core/macros.h> 00028 00029 #include <core/threading/mutex.h> 00030 #include <core/threading/wait_condition.h> 00031 00032 namespace fawkes { 00033 #if 0 /* just to make Emacs auto-indent happy */ 00034 } 00035 #endif 00036 00037 00038 /// @cond INTERNALS 00039 class InterruptibleBarrierData 00040 { 00041 public: 00042 unsigned int threads_left; 00043 Mutex *mutex; 00044 WaitCondition *waitcond; 00045 bool own_mutex; 00046 00047 InterruptibleBarrierData(Mutex *mutex) 00048 { 00049 if (mutex) { 00050 this->mutex = mutex; 00051 own_mutex = false; 00052 } else { 00053 this->mutex = new Mutex(); 00054 own_mutex = true; 00055 } 00056 waitcond = new WaitCondition(this->mutex); 00057 } 00058 00059 ~InterruptibleBarrierData() 00060 { 00061 if (own_mutex) delete mutex; 00062 delete waitcond; 00063 } 00064 }; 00065 /// @endcond 00066 00067 00068 /** @class InterruptibleBarrier <core/threading/barrier.h> 00069 * A barrier is a synchronization tool which blocks until a given number of 00070 * threads have reached the barrier. This particular implementations allows for 00071 * giving a timeout after which the waiting is aborted. 00072 * 00073 * For general information when a barrier is useful see the Barrier class. 00074 * 00075 * Additionally to the general barrier features the InterruptibleBarrier::wait() 00076 * can be given a timeout after which the waiting is aborted. 00077 * Since the POSIX standard does not provide a timed wait for barriers this 00078 * implementation uses a Mutex and WaitCondition internally to achieve the 00079 * desired result. 00080 * 00081 * @see Barrier 00082 * @ingroup Threading 00083 * @author Tim Niemueller 00084 */ 00085 00086 00087 /** Constructor. 00088 * @param count the number of threads to wait for 00089 */ 00090 InterruptibleBarrier::InterruptibleBarrier(unsigned int count) 00091 : Barrier() 00092 { 00093 _count = count; 00094 if ( _count == 0 ) { 00095 throw Exception("Barrier count must be at least 1"); 00096 } 00097 __data = new InterruptibleBarrierData(NULL); 00098 __data->threads_left = 0; 00099 __passed_threads = RefPtr<ThreadList>(new ThreadList()); 00100 00101 __interrupted = false; 00102 __timeout = false; 00103 } 00104 00105 00106 /** Constructor with custom mutex. 00107 * Use this constructor only if you really know what you are doing. This constructor 00108 * allows to pass a mutex that is used internally for the barrier. Note that in 00109 * this case it is your duty to lock the mutex before the wait() and unlock 00110 * afterwards! It combines features of a barrier and a wait condition. 00111 * @param mutex Mutex to use 00112 * @param count the number of threads to wait for 00113 */ 00114 InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count) 00115 : Barrier() 00116 { 00117 _count = count; 00118 if ( _count == 0 ) { 00119 throw Exception("Barrier count must be at least 1"); 00120 } 00121 __data = new InterruptibleBarrierData(mutex); 00122 __data->threads_left = 0; 00123 __passed_threads = RefPtr<ThreadList>(new ThreadList()); 00124 00125 __interrupted = false; 00126 __timeout = false; 00127 } 00128 00129 /** Destructor */ 00130 InterruptibleBarrier::~InterruptibleBarrier() 00131 { 00132 delete __data; 00133 } 00134 00135 00136 /** Get a list of threads that passed the barrier. 00137 * The list contains the threads that passed the barrier. With some book keeping 00138 * outside of the barrier you can determine which threads you expected at the 00139 * barrier but did not pass it. 00140 * @return refptr to list of threads that passed the barrier. 00141 */ 00142 RefPtr<ThreadList> 00143 InterruptibleBarrier::passed_threads() 00144 { 00145 return __passed_threads; 00146 } 00147 00148 00149 /** Interrupt the barrier. 00150 * This will cause all threads currently waiting on the barrier to 00151 * throw an exception and no further thread will wait. 00152 * You have to call reset() the before you use this barrier 00153 * the next time. 00154 */ 00155 void 00156 InterruptibleBarrier::interrupt() throw() 00157 { 00158 if (likely(__data->own_mutex)) __data->mutex->lock(); 00159 __interrupted = true; 00160 __data->waitcond->wake_all(); 00161 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00162 } 00163 00164 00165 /** Clears the barrier. 00166 * Call this method when you want to use the barrier the next time after 00167 * an interrupt or timeout occured. Make sure all threads that should have 00168 * passed the barrier the last time did pass it. 00169 */ 00170 void 00171 InterruptibleBarrier::reset() throw() 00172 { 00173 if (likely(__data->own_mutex)) __data->mutex->lock(); 00174 __interrupted = false; 00175 __timeout = false; 00176 __data->threads_left = _count; 00177 __passed_threads.clear(); 00178 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00179 } 00180 00181 00182 /** Wait for other threads. 00183 * This method will block until as many threads have called wait as you have 00184 * given count to the constructor. Note that if the barrier is interrupted or 00185 * times out you need to call reset() to get the barrier into a re-usable state. 00186 * It is your duty to make sure that all threads using the barrier are in a 00187 * cohesive state. 00188 * @param timeout_sec relative timeout in seconds, added to timeout_nanosec 00189 * @param timeout_nanosec timeout in nanoseconds 00190 * @return true, if the barrier was properly reached, false if the barrier timeout 00191 * was reached and the wait did not finish properly. 00192 * @exception InterruptedException thrown if the barrier was forcefully interrupted 00193 * by calling interrupt(). 00194 */ 00195 bool 00196 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec) 00197 { 00198 if (likely(__data->own_mutex)) __data->mutex->lock(); 00199 00200 if ( __data->threads_left == 0 ) { 00201 // first to come 00202 __timeout = __interrupted = false; 00203 __data->threads_left = _count; 00204 __passed_threads->clear(); 00205 } else { 00206 if ( __interrupted || __timeout ) { 00207 // interrupted or timed out threads need to be reset if they should be reused 00208 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00209 return true; 00210 } 00211 } 00212 --__data->threads_left; 00213 try { 00214 __passed_threads->push_back_locked(Thread::current_thread()); 00215 } catch (Exception &e) { 00216 // Cannot do anything more useful :-/ 00217 // to stay fully compatible with Barrier we do *not* re-throw 00218 e.print_trace(); 00219 } 00220 00221 bool local_timeout = false; 00222 while ( __data->threads_left && !__interrupted && !__timeout && ! local_timeout) { 00223 local_timeout = ! __data->waitcond->reltimed_wait(timeout_sec, timeout_nanosec); 00224 } 00225 if (local_timeout) __timeout = true; 00226 if ( __interrupted ) { 00227 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00228 throw InterruptedException("InterruptibleBarrier forcefully interrupted, only " 00229 "%u of %u threads reached the barrier", 00230 _count - __data->threads_left, _count); 00231 } 00232 00233 __data->waitcond->wake_all(); 00234 if (likely(__data->own_mutex)) __data->mutex->unlock(); 00235 00236 return ! __timeout; 00237 } 00238 00239 } // end namespace fawkes