24 #include <core/threading/thread_list.h>
25 #include <core/threading/thread.h>
26 #include <core/threading/mutex.h>
27 #include <core/threading/mutex_locker.h>
28 #include <core/threading/barrier.h>
29 #include <core/threading/interruptible_barrier.h>
30 #include <core/exceptions/software.h>
31 #include <core/exceptions/system.h>
56 append(
"Operation '%s' is not allowed on a sealed thread list", operation);
96 __name = strdup(tlname);
98 __finalize_mutex =
new Mutex();
112 __name = strdup(tlname);
114 __finalize_mutex =
new Mutex();
115 __wnw_barrier = NULL;
117 if ( maintain_barrier) update_barrier();
127 __name = strdup(tl.__name);
128 __sealed = tl.__sealed;
129 __finalize_mutex =
new Mutex();
130 __wnw_barrier = NULL;
131 if ( tl.__wnw_barrier != NULL ) update_barrier();
139 delete __finalize_mutex;
140 delete __wnw_barrier;
152 __name = strdup(tl.__name);
153 __sealed = tl.__sealed;
154 __finalize_mutex =
new Mutex();
155 __wnw_barrier = NULL;
156 if ( tl.__wnw_barrier != NULL ) update_barrier();
168 for (iterator i = begin(); i != end(); ++i) {
181 for (iterator i = begin(); i != end(); ++i) {
195 for (iterator i = begin(); i != end(); ++i) {
196 (*i)->wakeup(barrier);
210 unsigned int count = 1;
211 for (iterator i = begin(); i != end(); ++i) {
212 if ( ! (*i)->flagged_bad() ) {
214 (*i)->wakeup(barrier);
230 if (count != barrier->
count()) {
231 throw Exception(
"ThreadList(%s)::wakeup(): barrier has count (%u) different "
232 "from number of unflagged threads (%u)", __name, barrier->
count(), count);
249 if ( ! __wnw_barrier ) {
251 "barrier is maintained");
261 if ( ! __wnw_barrier->
wait(timeout_sec, timeout_nanosec) ) {
265 for (iterator i = begin(); i != end(); ++i) {
267 for (iterator j = passed_threads->begin(); j != passed_threads->end(); ++j) {
279 __wnw_bad_barriers.push_back(make_pair(__wnw_barrier, bad_threads));
281 __wnw_barrier = NULL;
286 if ( bad_threads.size() > 1 ) {
287 s =
"Multiple threads did not finish in time, flagging as bad: ";
288 for (iterator i = bad_threads.begin(); i != bad_threads.end(); ++i) {
289 s += std::string((*i)->name()) +
" ";
291 }
else if (bad_threads.size() == 0) {
292 s =
"Timeout happened, but no bad threads recorded.";
294 throw Exception(
"Thread %s did not finish in time (max %f), flagging as bad",
295 bad_threads.front()->
name(),
296 (float)timeout_sec + (
float)timeout_nanosec / 1000000000.);
312 delete __wnw_barrier;
313 __wnw_barrier = NULL;
314 if ( maintain_barrier ) update_barrier();
330 bool changed =
false;
331 __wnw_bbit = __wnw_bad_barriers.begin();
332 while (__wnw_bbit != __wnw_bad_barriers.end()) {
333 iterator i = __wnw_bbit->second.begin();
334 while (i != __wnw_bbit->second.end()) {
335 if ( (*i)->waiting() ) {
337 recovered_threads.push_back((*i)->name());
340 i = __wnw_bbit->second.erase(i);
346 if ( __wnw_bbit->second.empty() ) {
347 delete __wnw_bbit->first;
348 __wnw_bbit = __wnw_bad_barriers.erase(__wnw_bbit);
353 if ( changed ) update_barrier();
373 for (ThreadList::iterator i = begin(); i != end(); ++i) {
376 initializer->
init(*i);
378 cite.
append(
"Initialized failed to initialize thread '%s'", (*i)->name());
390 notify_of_failed_init();
391 cite.
append(
"Initializing thread '%s' in list '%s' failed",
392 (*i)->name(), __name);
398 notify_of_failed_init();
399 cite.
append(
"Could not initialize thread '%s'", (*i)->name());
404 }
catch (std::exception &e) {
405 notify_of_failed_init();
406 cite.
append(
"Could not initialize thread '%s'", (*i)->name());
407 cite.
append(
"Caught std::exception or derivative: %s", e.what());
412 notify_of_failed_init();
413 cite.
append(
"Could not initialize thread '%s'", (*i)->name());
414 cite.
append(
"Unknown exception caught");
422 initialized_threads.
finalize(finalizer);
437 for (iterator i = begin(); i != end(); ++i) {
463 for (iterator i = begin(); i != end(); ++i) {
489 for (iterator i = begin(); i != end(); ++i) {
504 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
528 bool can_finalize =
true;
530 bool threw_exception =
false;
531 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
537 can_finalize =
false;
539 if ( ! (*i)->prepare_finalize() ) {
540 can_finalize =
false;
543 cfte.
append(
"Thread '%s' throw an exception while preparing finalization of "
544 "ThreadList '%s'", (*i)->name(), __name);
545 threw_exception =
true;
548 if ( threw_exception ) {
568 Exception me(
"One or more threads failed to finalize");
569 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
574 me.
append(
"Could not finalize thread '%s' in list '%s'", (*i)->name(), __name);
581 me.
append(
"AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
584 me.
append(
"AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
587 me.
append(
"Thread[%s]::finalize() threw unsupported exception", (*i)->name());
603 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
604 (*i)->cancel_finalize();
621 for (i = begin(); i != end(); ++i) {
622 (*i)->set_prepfin_hold(hold);
627 for (iterator j = begin(); j != i; ++j) {
628 (*j)->set_prepfin_hold(
false);
673 va_start(va, format);
676 if (vasprintf(&tmpname, format, va) != -1) {
716 if ( __wnw_barrier) update_barrier();
735 if ( __wnw_barrier) update_barrier();
749 if ( __wnw_barrier) update_barrier();
768 if ( __wnw_barrier) update_barrier();
781 if ( __wnw_barrier) update_barrier();
794 if ( __wnw_barrier) update_barrier();
808 if ( __wnw_barrier) update_barrier();
819 if ( __wnw_barrier) update_barrier();
830 if ( __wnw_barrier) update_barrier();
844 if ( __wnw_barrier) update_barrier();
851 ThreadList::update_barrier()
853 unsigned int num = 1;
854 for (iterator i = begin(); i != end(); ++i) {
855 if (! (*i)->flagged_bad() ) ++num;
857 delete __wnw_barrier;
858 __wnw_barrier =
new InterruptibleBarrier(num);
864 ThreadList::notify_of_failed_init()
866 for (ThreadList::iterator i = begin(); i != end(); ++i) {
867 (*i)->notify_of_failed_init();
bool sealed()
Check if list is sealed.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
ThreadListSealedException(const char *operation)
Constructor.
ThreadListNotSealedException(const char *format,...)
Constructor.
LockList< Type > & operator=(const LockList< Type > &ll)
Copy values from another LockList.
const char * name()
Name of the thread list.
unsigned int count()
Get number of threads this barrier will wait for.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
void clear()
Clear the list.
void start()
Start threads.
void cancel_finalize()
Cancel finalization on all threads.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
void seal()
Seal the list.
virtual void finalize(Thread *thread)=0
Finalize a thread.
void set_prepfin_hold(bool hold)
Set prepfin hold on all threads.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
virtual void lock() const
Lock list.
void pop_back()
Remove last element.
bool prepare_finalize(ThreadFinalizer *finalizer)
Prepare finalize.
bool wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
Wait for other threads.
Thread list sealed exception.
void push_front_locked(Thread *thread)
Add thread to the front with lock protection.
virtual bool prepare_finalize(Thread *thread)=0
Prepare finalization of a thread.
Thread cannot be initialized.
ThreadList & operator=(const ThreadList &tl)
Assignment operator.
ThreadList(const char *tlname="")
Constructor.
Base class for exceptions in Fawkes.
void remove_locked(Thread *thread)
Remove with lock protection.
ThreadList::iterator erase(iterator pos)
Erase element at given position.
void init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Initialize threads.
Thread initializer interface.
void wakeup()
Wakeup all threads in list.
void finalize(ThreadFinalizer *finalizer)
Finalize Threads.
void pop_front()
Remove first element.
void append_va(const char *format, va_list va)
Append messages to the message list.
static const unsigned int FLAG_BAD
Standard thread flag: "thread is bad".
void push_back(Thread *thread)
Add thread to the end.
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
RefPtr<> is a reference-counting shared smartpointer.
void wakeup_unlocked()
Wakeup all threads in list.
Thread cannot be finalized.
void set_name(const char *format,...)
Set name of thread.
void try_recover(std::list< std::string > &recovered_threads)
Check if any of the bad barriers recovered.
void set_maintain_barrier(bool maintain_barrier)
Set if this thread list should maintain a barrier.
Mutex mutual exclusion lock.
void push_front(Thread *thread)
Add thread to the front.
void wakeup_and_wait(unsigned int timeout_sec=0, unsigned int timeout_nanosec=0)
Wakeup threads and wait for them to finish.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
System ran out of memory and desired operation could not be fulfilled.
void cancel()
Cancel threads.
void append(const char *format,...)
Append messages to the message list.
void remove(Thread *thread)
Remove with lock protection.
Thread finalizer interface.