24 #include <core/threading/thread.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/barrier.h>
28 #include <core/threading/wait_condition.h>
29 #include <core/threading/read_write_lock.h>
30 #include <core/threading/thread_finalizer.h>
31 #include <core/threading/thread_notification_listener.h>
32 #include <core/exceptions/software.h>
33 #include <core/exceptions/system.h>
34 #include <core/utils/lock_list.h>
184 pthread_mutex_t Thread::__thread_key_mutex = PTHREAD_MUTEX_INITIALIZER;
188 pthread_key_t Thread::THREAD_KEY = PTHREAD_KEYS_MAX;
190 #define MAIN_THREAD_NAME "__MainThread__"
215 __constructor(name, op_mode);
240 Thread::__constructor(
const char *name, OpMode op_mode)
244 __prepfin_conc_loop =
false;
245 __coalesce_wakeups =
false;
247 __name = strdup(name);
248 __notification_listeners =
new LockList<ThreadNotificationListener *>();
251 __sleep_mutex =
new Mutex();
252 __sleep_condition =
new WaitCondition(__sleep_mutex);
253 __waiting_for_wakeup =
true;
255 __sleep_condition = NULL;
256 __sleep_mutex = NULL;
257 __waiting_for_wakeup =
false;
265 __delete_on_exit =
false;
266 __prepfin_hold =
false;
267 __pending_wakeups = 0;
273 __loop_done_mutex =
new Mutex();
274 __loop_done_waitcond =
new WaitCondition(__loop_done_mutex);
277 __prepfin_hold_mutex =
new Mutex();
278 __prepfin_hold_waitcond =
new WaitCondition(__prepfin_hold_mutex);
279 __startup_barrier =
new Barrier(2);
289 delete __sleep_condition;
290 delete __sleep_mutex;
293 delete __notification_listeners;
295 delete __startup_barrier;
296 delete __prepfin_hold_mutex;
297 delete __prepfin_hold_waitcond;
298 delete __loop_done_waitcond;
299 delete __loop_done_mutex;
313 throw Exception(
"You may not use copy constructor of class Thread");
322 Thread::operator=(
const Thread &t)
324 throw Exception(
"You may not use assignment operator of class Thread");
387 __prepfin_hold_mutex->
lock();
388 while (__prepfin_hold) {
389 __prepfin_hold_waitcond->
wait();
391 if (! __prepfin_conc_loop) {
397 if (! __prepfin_conc_loop) {
401 __prepfin_hold_mutex->
unlock();
511 throw Exception(
"You cannot start the same thread twice!");
519 if ( (err = pthread_create(&__thread_id, NULL, Thread::entry,
this)) != 0) {
521 throw Exception(
"Could not start thread", err);
524 if (__wait) __startup_barrier->
wait();
529 Thread::lock_sleep_mutex()
532 __sleep_mutex->
lock();
543 Thread::entry(
void *pthis)
551 set_tsd_thread_instance(t);
554 t->lock_sleep_mutex();
557 t->notify_of_startup();
560 if (t->__wait) t->__startup_barrier->wait();
563 t->loop_mutex->lock();
565 t->loop_mutex->unlock();
568 if ( t->__detached ) {
571 t->__started =
false;
586 if ( __delete_on_exit ) {
603 pthread_join(__thread_id, &dont_care);
606 if ( __sleep_mutex != NULL ) {
632 pthread_detach(__thread_id);
642 if ( __started && ! __cancelled ) {
643 if ( pthread_cancel(__thread_id) == 0 ) {
658 pthread_kill(__thread_id, sig);
681 throw Exception(
"Cannot set thread opmode while running");
687 delete __sleep_condition;
688 delete __sleep_mutex;
689 __sleep_condition = NULL;
690 __sleep_mutex = NULL;
693 __sleep_mutex =
new Mutex();
717 __prepfin_conc_loop = concurrent;
733 __coalesce_wakeups = coalesce;
737 __coalesce_wakeups = coalesce;
752 va_start(va, format);
753 char *old_name = __name;
754 if (vasprintf(&__name, format, va) == -1) {
776 __prepfin_hold_mutex->
lock();
778 __prepfin_hold_mutex->
unlock();
779 throw Exception(
"Thread(%s)::set_prepfin_hold: prepare_finalize() has "
780 "been called already()", __name);
782 __prepfin_hold = hold;
784 __prepfin_hold_waitcond->
wake_all();
786 __prepfin_hold_mutex->
unlock();
860 return __waiting_for_wakeup;
870 pthread_testcancel();
898 return ( pthread_equal(__thread_id, thread.__thread_id) != 0 );
923 while (__pending_wakeups == 0) {
924 __waiting_for_wakeup =
true;
925 __sleep_condition->
wait();
927 __pending_wakeups -= 1;
942 __loop_done_mutex->
lock();
944 __loop_done_mutex->
unlock();
950 __sleep_mutex->
lock();
957 __sleep_mutex->
lock();
959 __sleep_mutex->
lock();
962 while (__pending_wakeups == 0) {
963 __waiting_for_wakeup =
true;
964 __sleep_condition->
wait();
966 __pending_wakeups -= 1;
985 throw Exception(
"Thread(%s): wakeup() cannot be called if loop is running "
986 "with barrier already", __name);
989 if (__coalesce_wakeups) __pending_wakeups = 1;
990 else __pending_wakeups += 1;
991 if (__waiting_for_wakeup) {
993 __waiting_for_wakeup =
false;
1010 if ( barrier == NULL ) {
1015 if ( ! __waiting_for_wakeup && __barrier) {
1016 throw Exception(
"Thread %s already running with barrier, cannot wakeup %i %p", __name,
1017 __waiting_for_wakeup, __barrier);
1020 __pending_wakeups += 1;
1021 __barrier = barrier;
1022 if (__waiting_for_wakeup) {
1024 __waiting_for_wakeup =
false;
1034 __loop_done_mutex->
lock();
1035 while (! __loop_done) {
1036 __loop_done_waitcond->
wait();
1038 __loop_done_mutex->
unlock();
1049 if ( __delete_on_exit ) {
1079 __delete_on_exit = del;
1091 return (__pending_wakeups > 0);
1118 __flags &= 0xFFFFFFFF ^ flag;
1150 __notification_listeners->push_back_locked(notification_listener);
1160 __notification_listeners->remove_locked(notification_listener);
1168 Thread::notify_of_startup()
1170 __notification_listeners->lock();
1172 while (i != __notification_listeners->end()) {
1173 if (! (*i)->thread_started(
this)) {
1174 i = __notification_listeners->erase(i);
1179 __notification_listeners->unlock();
1187 Thread::notify_of_failed_init()
1189 __notification_listeners->lock();
1190 LockList<ThreadNotificationListener *>::iterator i = __notification_listeners->begin();
1191 while (i != __notification_listeners->end()) {
1192 if ( ! (*i)->thread_init_failed(
this) ) {
1193 i = __notification_listeners->erase(i);
1198 __notification_listeners->unlock();
1206 Thread::init_thread_key()
1208 pthread_mutex_lock(&__thread_key_mutex);
1209 if ( THREAD_KEY == PTHREAD_KEYS_MAX ) {
1212 if ( (err = pthread_key_create(&THREAD_KEY, NULL)) != 0 ) {
1213 if ( ENOMEM == err ) {
1214 throw OutOfMemoryException(
"Could not create key for thread "
1215 "specific data (reference to thread)");
1217 throw Exception(
"Thread key for reference to thread could not be created", err);
1221 pthread_mutex_unlock(&__thread_key_mutex);
1231 Thread::set_tsd_thread_instance(Thread *t)
1234 if ( (err = pthread_setspecific(THREAD_KEY, t)) != 0 ) {
1235 if ( ENOMEM == err ) {
1236 throw OutOfMemoryException(
"Could not set specific data (reference to thread)");
1238 throw Exception(
"Could not set specific data (reference to thread), unknown reason");
1251 Thread *t =
new Thread(MAIN_THREAD_NAME, pthread_self());
1252 set_tsd_thread_instance(t);
1265 if ( strcmp(t->
name(), MAIN_THREAD_NAME) == 0 ) {
1268 throw Exception(
"Main thread can only be destroyed in main thread");
1281 return pthread_self();
1297 if ( THREAD_KEY == PTHREAD_KEYS_MAX ) {
1298 throw Exception(
"No thread has been initialized");
1300 return (
Thread *)pthread_getspecific(THREAD_KEY);
1313 if ( THREAD_KEY == PTHREAD_KEYS_MAX ) {
1316 return (
Thread *)pthread_getspecific(THREAD_KEY);
1329 int oldstate = PTHREAD_CANCEL_ENABLE;
1330 int newstate = PTHREAD_CANCEL_ENABLE;
1332 newstate = PTHREAD_CANCEL_DISABLE;
1335 pthread_setcancelstate(newstate, &oldstate);
1337 if ( old_state != NULL ) {
1338 if ( oldstate == PTHREAD_CANCEL_DISABLE ) {
bool operator==(const Thread &thread)
Check if two threads are the same.
Thread(const char *name)
Constructor.
void add_notification_listener(ThreadNotificationListener *notification_listener)
Add notification listener.
OpMode opmode() const
Get operation mode.
Wait until a given condition holds.
bool detached() const
Check if thread has been detached.
virtual void once()
Execute an action exactly once.
void unset_flag(uint32_t flag)
Unset flag.
bool finalize_prepared
True if prepare_finalize() has been called and was not stopped with a cancel_finalize(), false otherwise.
pthread_t thread_id() const
Get ID of thread.
void unlock()
Unlock the mutex.
virtual void wait()
Wait for other threads.
bool started() const
Check if thread has been started.
void wake_all()
Wake up all waiting threads.
virtual ~Thread()
Virtual destructor.
virtual void run()
Code to execute in the thread.
OpMode
Thread operation mode.
Thread notification listener interface.
void cancel_finalize()
Cancel finalization.
bool wakeup_pending()
Check if wakeups are pending.
thread cannot be cancelled
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
static void init_main()
Initialize Thread wrapper instance for main thread.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
static void set_cancel_state(CancelState new_state, CancelState *old_state=0)
Set the cancel state of the current thread.
static Thread * current_thread_noexc()
Similar to current_thread, but does never throw an exception.
void wait_loop_done()
Wait for the current loop iteration to finish.
Mutex * loopinterrupt_antistarve_mutex
Mutex to avoid starvation when trying to lock loop_mutex.
bool flagged_bad() const
Check if FLAG_BAD was set.
bool waiting() const
Check if thread is currently waiting for wakeup.
void wakeup()
Wake up thread.
void set_name(const char *format,...)
Set name of thread.
Base class for exceptions in Fawkes.
static pthread_t current_thread_id()
Get the ID of the currently running thread.
static void destroy_main()
Destroy main thread wrapper instance.
virtual void finalize()
Finalize the thread.
bool cancelled() const
Check if thread has been cancelled.
bool running() const
Check if the thread is running.
bool prepare_finalize()
Prepare finalization.
operate in continuous mode (default)
void set_delete_on_exit(bool del)
Set whether the thread should be deleted on exit.
void set_opmode(OpMode op_mode)
Set operation mode.
static Thread * current_thread()
Get the Thread instance of the currently running thread.
void remove_notification_listener(ThreadNotificationListener *notification_listener)
Remove notification listener.
static const unsigned int FLAG_BAD
Standard thread flag: "thread is bad".
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
bool try_lock()
Tries to lock the mutex.
void wait()
Wait for the condition forever.
const char * name() const
Get name of thread.
void cancel()
Cancel a thread.
void kill(int sig)
Send signal to a thread.
void test_cancel()
Set cancellation point.
virtual void loop()
Code to execute in the thread.
void yield()
Yield the processor to another thread or process.
Thread cannot be finalized.
void detach()
Detach the thread.
void join()
Join the thread.
void set_prepfin_hold(bool hold)
Hold prepare_finalize().
void lock()
Lock this mutex.
virtual void init()
Initialize the thread.
void set_flag(uint32_t flag)
Set flag for the thread.
operate in wait-for-wakeup mode
Mutex mutual exclusion lock.
void exit()
Exit the thread.
void stopby()
Shortly stop by at the mutex.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
System ran out of memory and desired operation could not be fulfilled.
void set_flags(uint32_t flags)
Set all flags in one go.
void start(bool wait=true)
Call this method to start the thread.