24 #include <core/exceptions/software.h> 25 #include <core/exceptions/system.h> 26 #include <core/threading/barrier.h> 27 #include <core/threading/mutex.h> 28 #include <core/threading/mutex_locker.h> 29 #include <core/threading/read_write_lock.h> 30 #include <core/threading/thread.h> 31 #include <core/threading/thread_finalizer.h> 32 #include <core/threading/thread_loop_listener.h> 33 #include <core/threading/thread_notification_listener.h> 34 #include <core/threading/wait_condition.h> 35 #include <core/utils/lock_list.h> 37 #if defined(gnu_linux___) && !defined(_GNU_SOURCE) 188 pthread_mutex_t Thread::thread_key_mutex_ = PTHREAD_MUTEX_INITIALIZER;
191 pthread_key_t Thread::THREAD_KEY = PTHREAD_KEYS_MAX;
193 #define MAIN_THREAD_NAME "MainThread___" 217 __constructor(
name, op_mode);
240 Thread::__constructor(
const char *name, OpMode op_mode)
244 prepfin_conc_loop_ =
false;
245 coalesce_wakeups_ =
false;
247 name_ = strdup(
name);
248 notification_listeners_ =
new LockList<ThreadNotificationListener *>();
249 loop_listeners_ =
new LockList<ThreadLoopListener *>();
252 sleep_mutex_ =
new Mutex();
253 sleep_condition_ =
new WaitCondition(sleep_mutex_);
254 waiting_for_wakeup_ =
true;
256 sleep_condition_ = NULL;
258 waiting_for_wakeup_ =
false;
266 delete_on_exit_ =
false;
267 prepfin_hold_ =
false;
268 pending_wakeups_ = 0;
274 loop_done_mutex_ =
new Mutex();
275 loop_done_waitcond_ =
new WaitCondition(loop_done_mutex_);
278 prepfin_hold_mutex_ =
new Mutex();
279 prepfin_hold_waitcond_ =
new WaitCondition(prepfin_hold_mutex_);
280 startup_barrier_ =
new Barrier(2);
289 delete sleep_condition_;
293 delete notification_listeners_;
294 delete loop_listeners_;
296 delete startup_barrier_;
297 delete prepfin_hold_mutex_;
298 delete prepfin_hold_waitcond_;
299 delete loop_done_waitcond_;
300 delete loop_done_mutex_;
312 throw Exception(
"You may not use copy constructor of class Thread");
320 Thread::operator=(
const Thread &t)
322 throw Exception(
"You may not use assignment operator of class Thread");
383 prepfin_hold_mutex_->
lock();
384 while (prepfin_hold_) {
385 prepfin_hold_waitcond_->
wait();
387 if (!prepfin_conc_loop_) {
393 if (!prepfin_conc_loop_) {
397 prepfin_hold_mutex_->
unlock();
503 throw Exception(
"You cannot start the same thread twice!");
511 if ((err = pthread_create(&thread_id_, NULL, Thread::entry,
this)) != 0) {
513 throw Exception(
"Could not start thread", err);
515 #if defined(_GNU_SOURCE) && defined(GLIBC___) \ 516 && ((GLIBC___ == 2 && GLIBC_MINOR___ >= 12) || GLIBC___ > 2) 518 strncpy(tmpname, name_, 15);
520 pthread_setname_np(thread_id_, tmpname);
524 startup_barrier_->
wait();
528 Thread::lock_sleep_mutex()
531 sleep_mutex_->
lock();
541 Thread::entry(
void *pthis)
549 set_tsd_thread_instance(t);
552 t->lock_sleep_mutex();
555 t->notify_of_startup();
559 t->startup_barrier_->wait();
562 t->loop_mutex->lock();
564 t->loop_mutex->unlock();
584 if (delete_on_exit_) {
588 waiting_for_wakeup_ =
false;
601 pthread_join(thread_id_, &dont_care);
604 if (sleep_mutex_ != NULL) {
626 loop_listeners_->try_lock();
627 loop_listeners_->unlock();
639 pthread_detach(thread_id_);
648 if (started_ && !cancelled_) {
649 if (pthread_cancel(thread_id_) == 0) {
650 waiting_for_wakeup_ =
false;
664 pthread_kill(thread_id_, sig);
685 throw Exception(
"Cannot set thread opmode while running");
690 delete sleep_condition_;
692 sleep_condition_ = NULL;
695 sleep_mutex_ =
new Mutex();
718 prepfin_conc_loop_ = concurrent;
733 coalesce_wakeups_ = coalesce;
737 coalesce_wakeups_ = coalesce;
751 va_start(va, format);
752 char *old_name = name_;
753 if (vasprintf(&name_, format, va) == -1) {
760 #if defined(_GNU_SOURCE) && defined(GLIBC___) \ 761 && ((GLIBC___ == 2 && GLIBC_MINOR___ >= 12) || GLIBC___ > 2) 764 strncpy(tmpname, name_, 15);
766 pthread_setname_np(thread_id_, tmpname);
783 prepfin_hold_mutex_->
lock();
785 prepfin_hold_mutex_->
unlock();
786 throw Exception(
"Thread(%s)::set_prepfin_hold: prepare_finalize() has " 787 "been called already()",
790 prepfin_hold_ = hold;
794 prepfin_hold_mutex_->
unlock();
863 return waiting_for_wakeup_;
873 pthread_testcancel();
899 return (pthread_equal(thread_id_, thread.thread_id_) != 0);
923 while (pending_wakeups_ == 0) {
924 waiting_for_wakeup_ =
true;
925 sleep_condition_->
wait();
927 pending_wakeups_ -= 1;
938 loop_listeners_->lock();
940 it != loop_listeners_->end();
942 (*it)->pre_loop(
this);
944 loop_listeners_->unlock();
950 loop_listeners_->lock();
952 it != loop_listeners_->rend();
954 (*it)->post_loop(
this);
956 loop_listeners_->unlock();
959 loop_done_mutex_->
lock();
961 loop_done_mutex_->
unlock();
967 sleep_mutex_->
lock();
974 sleep_mutex_->
lock();
976 sleep_mutex_->
lock();
979 while (pending_wakeups_ == 0) {
980 waiting_for_wakeup_ =
true;
981 sleep_condition_->
wait();
983 pending_wakeups_ -= 1;
1001 throw Exception(
"Thread(%s): wakeup() cannot be called if loop is running " 1002 "with barrier already",
1006 if (coalesce_wakeups_)
1007 pending_wakeups_ = 1;
1009 pending_wakeups_ += 1;
1010 if (waiting_for_wakeup_) {
1012 waiting_for_wakeup_ =
false;
1029 if (barrier == NULL) {
1034 if (!waiting_for_wakeup_ && barrier_) {
1035 throw Exception(
"Thread %s already running with barrier, cannot wakeup %i %p",
1037 waiting_for_wakeup_,
1041 pending_wakeups_ += 1;
1043 if (waiting_for_wakeup_) {
1045 waiting_for_wakeup_ =
false;
1054 loop_done_mutex_->
lock();
1055 while (!loop_done_) {
1056 loop_done_waitcond_->
wait();
1058 loop_done_mutex_->
unlock();
1069 if (delete_on_exit_) {
1098 delete_on_exit_ = del;
1109 return (pending_wakeups_ > 0);
1135 flags_ &= 0xFFFFFFFF ^ flag;
1164 notification_listeners_->push_back_locked(notification_listener);
1173 notification_listeners_->remove_locked(notification_listener);
1183 loop_listeners_->push_back_locked(loop_listener);
1192 loop_listeners_->remove_locked(loop_listener);
1199 Thread::notify_of_startup()
1201 notification_listeners_->lock();
1203 while (i != notification_listeners_->end()) {
1204 if (!(*i)->thread_started(
this)) {
1205 i = notification_listeners_->erase(i);
1210 notification_listeners_->unlock();
1220 notification_listeners_->lock();
1222 while (i != notification_listeners_->end()) {
1223 if (!(*i)->thread_init_failed(
this)) {
1224 i = notification_listeners_->erase(i);
1229 notification_listeners_->unlock();
1236 Thread::init_thread_key()
1238 pthread_mutex_lock(&thread_key_mutex_);
1239 if (THREAD_KEY == PTHREAD_KEYS_MAX) {
1242 if ((err = pthread_key_create(&THREAD_KEY, NULL)) != 0) {
1243 if (ENOMEM == err) {
1245 "specific data (reference to thread)");
1247 throw Exception(
"Thread key for reference to thread could not be created", err);
1251 pthread_mutex_unlock(&thread_key_mutex_);
1260 Thread::set_tsd_thread_instance(Thread *t)
1263 if ((err = pthread_setspecific(THREAD_KEY, t)) != 0) {
1264 if (ENOMEM == err) {
1265 throw OutOfMemoryException(
"Could not set specific data (reference to thread)");
1267 throw Exception(
"Could not set specific data (reference to thread), unknown reason");
1279 Thread *t =
new Thread(MAIN_THREAD_NAME, pthread_self());
1280 set_tsd_thread_instance(t);
1292 if (strcmp(t->
name(), MAIN_THREAD_NAME) == 0) {
1295 throw Exception(
"Main thread can only be destroyed in main thread");
1307 return pthread_self();
1323 #if defined(_GNU_SOURCE) && defined(GLIBC___) \ 1324 && ((GLIBC___ == 2 && GLIBC_MINOR___ >= 12) || GLIBC___ > 2) 1327 if (pthread_getname_np(pthread_self(),
name, 16) == 0) {
1347 return t->
set_name(
"%s", thread_name.c_str());
1348 #if defined(_GNU_SOURCE) && defined(GLIBC___) \ 1349 && ((GLIBC___ == 2 && GLIBC_MINOR___ >= 12) || GLIBC___ > 2) 1351 pthread_setname_np(pthread_self(), thread_name.c_str());
1368 if (THREAD_KEY == PTHREAD_KEYS_MAX) {
1369 throw Exception(
"No thread has been initialized");
1371 return (
Thread *)pthread_getspecific(THREAD_KEY);
1383 if (THREAD_KEY == PTHREAD_KEYS_MAX) {
1386 return (
Thread *)pthread_getspecific(THREAD_KEY);
1398 int oldstate = PTHREAD_CANCEL_ENABLE;
1399 int newstate = PTHREAD_CANCEL_ENABLE;
1401 newstate = PTHREAD_CANCEL_DISABLE;
1404 pthread_setcancelstate(newstate, &oldstate);
1406 if (old_state != NULL) {
1407 if (oldstate == PTHREAD_CANCEL_DISABLE) {
bool operator==(const Thread &thread)
Check if two threads are the same.
Thread(const char *name)
Constructor.
void add_notification_listener(ThreadNotificationListener *notification_listener)
Add notification listener.
Wait until a given condition holds.
static std::string current_thread_name()
Get the name of the current thread.
virtual void once()
Execute an action exactly once.
void unset_flag(uint32_t flag)
Unset flag.
bool finalize_prepared
True if prepare_finalize() has been called and was not stopped with a cancel_finalize(),...
Fawkes library namespace.
void unlock()
Unlock the mutex.
virtual void wait()
Wait for other threads.
void wake_all()
Wake up all waiting threads.
virtual ~Thread()
Virtual destructor.
virtual void run()
Code to execute in the thread.
OpMode
Thread operation mode.
bool running() const
Check if the thread is running.
Thread notification listener interface.
void cancel_finalize()
Cancel finalization.
bool flagged_bad() const
Check if FLAG_BAD was set.
bool wakeup_pending()
Check if wakeups are pending.
thread cannot be cancelled
A NULL pointer was supplied where not allowed.
Thread class encapsulation of pthreads.
bool waiting() const
Check if thread is currently waiting for wakeup.
static void init_main()
Initialize Thread wrapper instance for main thread.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
static void set_cancel_state(CancelState new_state, CancelState *old_state=0)
Set the cancel state of the current thread.
static Thread * current_thread_noexc()
Similar to current_thread, but does never throw an exception.
bool cancelled() const
Check if thread has been cancelled.
void wait_loop_done()
Wait for the current loop iteration to finish.
OpMode opmode() const
Get operation mode.
Mutex * loopinterrupt_antistarve_mutex
Mutex to avoid starvation when trying to lock loop_mutex.
void add_loop_listener(ThreadLoopListener *loop_listener)
Add loop listener.
void wakeup()
Wake up thread.
void set_name(const char *format,...)
Set name of thread.
Base class for exceptions in Fawkes.
static pthread_t current_thread_id()
Get the ID of the currently running thread.
static void destroy_main()
Destroy main thread wrapper instance.
virtual void finalize()
Finalize the thread.
bool started() const
Check if thread has been started.
bool prepare_finalize()
Prepare finalization.
operate in continuous mode (default)
void set_delete_on_exit(bool del)
Set whether the thread should be deleted on exit.
void set_opmode(OpMode op_mode)
Set operation mode.
static Thread * current_thread()
Get the Thread instance of the currently running thread.
void remove_notification_listener(ThreadNotificationListener *notification_listener)
Remove notification listener.
const char * name() const
Get name of thread.
void notify_of_failed_init()
Notify of failed init.
void remove_loop_listener(ThreadLoopListener *loop_listener)
Remove loop listener.
static const unsigned int FLAG_BAD
Standard thread flag: "thread is bad".
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
bool try_lock()
Tries to lock the mutex.
void wait()
Wait for the condition forever.
void cancel()
Cancel a thread.
void kill(int sig)
Send signal to a thread.
void test_cancel()
Set cancellation point.
bool detached() const
Check if thread has been detached.
virtual void loop()
Code to execute in the thread.
void yield()
Yield the processor to another thread or process.
pthread_t thread_id() const
Get ID of thread.
Thread cannot be finalized.
Thread loop listener interface.
void detach()
Detach the thread.
void join()
Join the thread.
void set_prepfin_hold(bool hold)
Hold prepare_finalize().
void lock()
Lock this mutex.
virtual void init()
Initialize the thread.
void set_flag(uint32_t flag)
Set flag for the thread.
operate in wait-for-wakeup mode
Mutex mutual exclusion lock.
void exit()
Exit the thread.
void stopby()
Shortly stop by at the mutex.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
System ran out of memory and desired operation could not be fulfilled.
void set_flags(uint32_t flags)
Set all flags in one go.
void start(bool wait=true)
Call this method to start the thread.