Fawkes API  Fawkes Development Version
interruptible_barrier.cpp
1 
2 /***************************************************************************
3  * interruptible_barrier.cpp - Interruptible Barrier
4  *
5  * Created: Sat Jan 31 12:30:32 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/system.h>
25 #include <core/macros.h>
26 #include <core/threading/interruptible_barrier.h>
27 #include <core/threading/mutex.h>
28 #include <core/threading/thread_list.h>
29 #include <core/threading/wait_condition.h>
30 
31 namespace fawkes {
32 
33 /// @cond INTERNALS
34 class InterruptibleBarrierData
35 {
36 public:
37  unsigned int threads_left;
38  Mutex * mutex;
39  WaitCondition *waitcond;
40  bool own_mutex;
41 
42  InterruptibleBarrierData(Mutex *mutex)
43  {
44  if (mutex) {
45  this->mutex = mutex;
46  own_mutex = false;
47  } else {
48  this->mutex = new Mutex();
49  own_mutex = true;
50  }
51  waitcond = new WaitCondition(this->mutex);
52  }
53 
54  ~InterruptibleBarrierData()
55  {
56  if (own_mutex)
57  delete mutex;
58  delete waitcond;
59  }
60 };
61 /// @endcond
62 
63 /** @class InterruptibleBarrier <core/threading/barrier.h>
64  * A barrier is a synchronization tool which blocks until a given number of
65  * threads have reached the barrier. This particular implementations allows for
66  * giving a timeout after which the waiting is aborted.
67  *
68  * For general information when a barrier is useful see the Barrier class.
69  *
70  * Additionally to the general barrier features the InterruptibleBarrier::wait()
71  * can be given a timeout after which the waiting is aborted.
72  * Since the POSIX standard does not provide a timed wait for barriers this
73  * implementation uses a Mutex and WaitCondition internally to achieve the
74  * desired result.
75  *
76  * @see Barrier
77  * @ingroup Threading
78  * @author Tim Niemueller
79  */
80 
81 /** Constructor.
82  * @param count the number of threads to wait for
83  */
85 {
86  _count = count;
87  if (_count == 0) {
88  throw Exception("Barrier count must be at least 1");
89  }
90  data_ = new InterruptibleBarrierData(NULL);
91  data_->threads_left = 0;
92  passed_threads_ = RefPtr<ThreadList>(new ThreadList());
93 
94  interrupted_ = false;
95  timeout_ = false;
96  num_threads_in_wait_function_ = 0;
97 }
98 
99 /** Constructor with custom mutex.
100  * Use this constructor only if you really know what you are doing. This constructor
101  * allows to pass a mutex that is used internally for the barrier. Note that in
102  * this case it is your duty to lock the mutex before the wait() and unlock
103  * afterwards! It combines features of a barrier and a wait condition.
104  * @param mutex Mutex to use
105  * @param count the number of threads to wait for
106  */
107 InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count) : Barrier(count)
108 {
109  _count = count;
110  if (_count == 0) {
111  throw Exception("Barrier count must be at least 1");
112  }
113  data_ = new InterruptibleBarrierData(mutex);
114  data_->threads_left = 0;
115  passed_threads_ = RefPtr<ThreadList>(new ThreadList());
116 
117  interrupted_ = false;
118  timeout_ = false;
119  num_threads_in_wait_function_ = 0;
120 }
121 
122 /** Invalid constructor.
123  * This will throw an exception if called as it is illegal to copy
124  * a barrier.
125  * @param barrier to copy
126  */
128 {
129  throw Exception("Barriers cannot be copied");
130 }
131 
132 /** Invalid constructor.
133  * This will throw an exception if called as it is illegal to copy
134  * a barrier.
135  * @param barrier to copy
136  */
137 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier *b) : Barrier()
138 {
139  throw Exception("Barriers cannot be copied");
140 }
141 
142 /** Invalid assignment operator.
143  * This will throw an exception if called as it is illegal to assign
144  * a barrier.
145  * @param barrier to copy
146  */
147 InterruptibleBarrier &
148 InterruptibleBarrier::operator=(const InterruptibleBarrier &b)
149 {
150  throw Exception("Barriers cannot be assigned");
151 }
152 
153 /** Invalid assignment operator.
154  * This will throw an exception if called as it is illegal to assign
155  * a barrier.
156  * @param barrier to copy
157  */
158 InterruptibleBarrier &
159 InterruptibleBarrier::operator=(const InterruptibleBarrier *b)
160 {
161  throw Exception("Barriers cannot be assigned");
162 }
163 
164 /** Destructor */
166 {
167  delete data_;
168 }
169 
170 /** Get a list of threads that passed the barrier.
171  * The list contains the threads that passed the barrier. With some book keeping
172  * outside of the barrier you can determine which threads you expected at the
173  * barrier but did not pass it.
174  * @return refptr to list of threads that passed the barrier.
175  */
178 {
179  return passed_threads_;
180 }
181 
182 /** Interrupt the barrier.
183  * This will cause all threads currently waiting on the barrier to
184  * throw an exception and no further thread will wait.
185  * You have to call reset() the before you use this barrier
186  * the next time.
187  */
188 void
190 {
191  if (likely(data_->own_mutex))
192  data_->mutex->lock();
193  interrupted_ = true;
194  data_->waitcond->wake_all();
195  if (likely(data_->own_mutex))
196  data_->mutex->unlock();
197 }
198 
199 /** Clears the barrier.
200  * Call this method when you want to use the barrier the next time after
201  * an interrupt or timeout occured. Make sure all threads that should have
202  * passed the barrier the last time did pass it.
203  */
204 void
206 {
207  if (likely(data_->own_mutex))
208  data_->mutex->lock();
209  interrupted_ = false;
210  timeout_ = false;
211  data_->threads_left = _count;
212  passed_threads_.clear();
213  if (likely(data_->own_mutex))
214  data_->mutex->unlock();
215 }
216 
217 /** Wait for other threads.
218  * This method will block until as many threads have called wait as you have
219  * given count to the constructor. Note that if the barrier is interrupted or
220  * times out you need to call reset() to get the barrier into a re-usable state.
221  * It is your duty to make sure that all threads using the barrier are in a
222  * cohesive state.
223  * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
224  * @param timeout_nanosec timeout in nanoseconds
225  * @return true, if the barrier was properly reached, false if the barrier timeout
226  * was reached and the wait did not finish properly.
227  * @exception InterruptedException thrown if the barrier was forcefully interrupted
228  * by calling interrupt().
229  */
230 bool
231 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
232 {
233  if (likely(data_->own_mutex))
234  data_->mutex->lock();
235  num_threads_in_wait_function_++;
236 
237  if (data_->threads_left == 0) {
238  // first to come
239  timeout_ = interrupted_ = wait_at_barrier_ = false;
240  data_->threads_left = _count;
241  passed_threads_->clear();
242  } else {
243  if (interrupted_ || timeout_) {
244  // interrupted or timed out threads need to be reset if they should be reused
245  num_threads_in_wait_function_--;
246  if (likely(data_->own_mutex))
247  data_->mutex->unlock();
248  return true;
249  }
250  }
251 
252  --data_->threads_left;
253  try {
254  passed_threads_->push_back_locked(Thread::current_thread());
255  } catch (Exception &e) {
256  // Cannot do anything more useful :-/
257  // to stay fully compatible with Barrier we do *not* re-throw
258  e.print_trace();
259  }
260 
261  bool local_timeout = false;
262 
263  //Am I the last thread the interruptable barrier is waiting for? Then I can wake the others up.
264  bool waker = (data_->threads_left == 0);
265 
266  while (data_->threads_left && !interrupted_ && !timeout_ && !local_timeout) {
267  //Here, the threads are waiting for the barrier
268  //pthread_cond_timedwait releases data_->mutex if it is not external
269  local_timeout = !data_->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
270  //before continuing, pthread_cond_timedwait locks data_->mutex again if it is not external
271  }
272 
273  if (local_timeout) {
274  //set timeout flag of the interruptable barrier so the other threads can continue
275  timeout_ = true;
276  }
277 
278  if (interrupted_) {
279  if (likely(data_->own_mutex))
280  data_->mutex->unlock();
281  throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
282  "%u of %u threads reached the barrier",
283  _count - data_->threads_left,
284  _count);
285  }
286 
287  if (waker) {
288  //all threads of this barrier have to synchronize at the standard Barrier
289  wait_at_barrier_ = true;
290  }
291 
292  if (waker || local_timeout) {
293  //the other threads can stop waiting in th while-loop
294  data_->waitcond->wake_all();
295  }
296 
297  if (likely(data_->own_mutex))
298  data_->mutex->unlock();
299 
300  if (wait_at_barrier_) {
301  //hard synchronization
302  Barrier::wait();
303  }
304 
305  if (likely(data_->own_mutex))
306  data_->mutex->lock();
307  //increment is not threadsafe
308  num_threads_in_wait_function_--;
309  if (likely(data_->own_mutex))
310  data_->mutex->unlock();
311 
312  return !timeout_;
313 }
314 
315 /** Checks if there are no more threads in the wait() function.
316  * This method is used to prevent the destruction of the barrier
317  * while there are threads in wait().
318  * @return true, if no thread currently is in wait()
319  */
320 bool
322 {
323  if (likely(data_->own_mutex))
324  data_->mutex->lock();
325  bool res = num_threads_in_wait_function_ == 0;
326  if (likely(data_->own_mutex))
327  data_->mutex->unlock();
328 
329  return res;
330 }
331 
332 } // end namespace fawkes
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:176
void interrupt()
Interrupt the barrier.
virtual void wait()
Wait for other threads.
bool no_threads_in_wait()
Checks if there are no more threads in the wait() function.
Fawkes library namespace.
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:153
virtual ~InterruptibleBarrier()
Destructor.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
List of threads.
Definition: thread_list.h:55
Base class for exceptions in Fawkes.
Definition: exception.h:35
InterruptibleBarrier(unsigned int count)
Constructor.
unsigned int _count
Number of threads that are expected to wait for the barrier.
Definition: barrier.h:47
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1366
The current system call has been interrupted (for instance by a signal).
Definition: system.h:38
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:601
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:49
void reset()
Clears the barrier.
Mutex mutual exclusion lock.
Definition: mutex.h:32
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:31