thread_manager.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <mainapp/thread_manager.h>
00025 #include <core/threading/thread.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <core/threading/wait_condition.h>
00028 #include <core/threading/thread_initializer.h>
00029 #include <core/threading/thread_finalizer.h>
00030 #include <core/exceptions/software.h>
00031 #include <core/exceptions/system.h>
00032
00033 #include <aspect/blocked_timing.h>
00034
00035 using namespace fawkes;
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 FawkesThreadManager::FawkesThreadManagerAspectCollector::FawkesThreadManagerAspectCollector(FawkesThreadManager *parent_manager)
00055 {
00056 __parent_manager = parent_manager;
00057 }
00058
00059
00060 void
00061 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(ThreadList &tl)
00062 {
00063 BlockedTimingAspect *timed_thread;
00064
00065 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00066 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00067 throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00068 }
00069 }
00070
00071 __parent_manager->add_maybelocked(tl, false);
00072 }
00073
00074
00075 void
00076 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(Thread *t)
00077 {
00078 BlockedTimingAspect *timed_thread;
00079
00080 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00081 throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00082 }
00083
00084 __parent_manager->add_maybelocked(t, false);
00085 }
00086
00087
00088 void
00089 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(ThreadList &tl)
00090 {
00091 BlockedTimingAspect *timed_thread;
00092
00093 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00094 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00095 throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00096 }
00097 }
00098
00099 __parent_manager->remove_maybelocked(tl, false);
00100 }
00101
00102
00103 void
00104 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(Thread *t)
00105 {
00106 BlockedTimingAspect *timed_thread;
00107
00108 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00109 throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00110 }
00111
00112 __parent_manager->remove_maybelocked(t, false);
00113 }
00114
00115
00116 void
00117 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
00118 {
00119 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00120 }
00121
00122 void
00123 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
00124 {
00125 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00126 }
00127
00128
00129
00130
00131 FawkesThreadManager::FawkesThreadManager()
00132 {
00133 initializer = NULL;
00134 finalizer = NULL;
00135 threads.clear();
00136 waitcond_timedthreads = new WaitCondition();
00137 __interrupt_timed_thread_wait = false;
00138 __aspect_collector = new FawkesThreadManagerAspectCollector(this);
00139 }
00140
00141
00142
00143 FawkesThreadManager::~FawkesThreadManager()
00144 {
00145
00146
00147 for (tit = threads.begin(); tit != threads.end(); ++tit) {
00148 (*tit).second.force_stop(finalizer);
00149 }
00150 untimed_threads.force_stop(finalizer);
00151 threads.clear();
00152
00153 delete waitcond_timedthreads;
00154 delete __aspect_collector;
00155 }
00156
00157
00158
00159
00160
00161
00162
00163 void
00164 FawkesThreadManager::set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
00165 {
00166 this->initializer = initializer;
00167 this->finalizer = finalizer;
00168 }
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 void
00179 FawkesThreadManager::internal_remove_thread(Thread *t)
00180 {
00181 BlockedTimingAspect *timed_thread;
00182
00183 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00184
00185 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00186 if ( threads.find(hook) != threads.end() ) {
00187 threads[hook].remove_locked(t);
00188 if (threads[hook].empty()) threads.erase(hook);
00189 }
00190 } else {
00191 untimed_threads.remove_locked(t);
00192 }
00193 }
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203 void
00204 FawkesThreadManager::internal_add_thread(Thread *t)
00205 {
00206 BlockedTimingAspect *timed_thread;
00207 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00208 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00209
00210 if ( threads.find(hook) == threads.end() ) {
00211 threads[hook].set_name("FawkesThreadManagerList Hook %i", hook);
00212 threads[hook].set_maintain_barrier(true);
00213 }
00214 threads[hook].push_back_locked(t);
00215
00216 waitcond_timedthreads->wake_all();
00217 } else {
00218 untimed_threads.push_back_locked(t);
00219 }
00220 }
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234 void
00235 FawkesThreadManager::add_maybelocked(ThreadList &tl, bool lock)
00236 {
00237 if ( ! (initializer && finalizer) ) {
00238 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00239 }
00240
00241 if ( tl.sealed() ) {
00242 throw Exception("Not accepting new threads from list that is not fresh, "
00243 "list '%s' already sealed", tl.name());
00244 }
00245
00246 tl.lock();
00247
00248
00249 try {
00250 tl.init(initializer, finalizer);
00251 } catch (Exception &e) {
00252 tl.unlock();
00253 throw;
00254 }
00255
00256 tl.seal();
00257 tl.start();
00258
00259
00260 MutexLocker locker(threads.mutex(), lock);
00261 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00262 internal_add_thread(*i);
00263 }
00264
00265 tl.unlock();
00266 }
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279 void
00280 FawkesThreadManager::add_maybelocked(Thread *thread, bool lock)
00281 {
00282 if ( thread == NULL ) {
00283 throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
00284 }
00285
00286 if ( ! (initializer && finalizer) ) {
00287 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00288 }
00289
00290 try {
00291 initializer->init(thread);
00292 } catch (CannotInitializeThreadException &e) {
00293 e.append("Adding thread in FawkesThreadManager failed");
00294 throw;
00295 }
00296
00297 thread->start();
00298 MutexLocker locker(threads.mutex(), lock);
00299 internal_add_thread(thread);
00300 }
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317 void
00318 FawkesThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
00319 {
00320 if ( ! (initializer && finalizer) ) {
00321 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00322 }
00323
00324
00325 if ( ! tl.sealed() ) {
00326 throw ThreadListNotSealedException("(FawkesThreadManager) Cannot remove unsealed thread "
00327 "list. Not accepting unsealed list '%s' for removal",
00328 tl.name());
00329 }
00330
00331 tl.lock();
00332 MutexLocker locker(threads.mutex(), lock);
00333
00334 try {
00335 if ( ! tl.prepare_finalize(finalizer) ) {
00336 tl.cancel_finalize();
00337 tl.unlock();
00338 throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
00339 "finalized", tl.name());
00340 }
00341 } catch (CannotFinalizeThreadException &e) {
00342 tl.unlock();
00343 throw;
00344 } catch (Exception &e) {
00345 tl.unlock();
00346 e.append("One or more threads in list '%s' cannot be finalized", tl.name());
00347 throw CannotFinalizeThreadException(e);
00348 }
00349
00350 tl.stop();
00351 tl.finalize(finalizer);
00352
00353 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00354 internal_remove_thread(*i);
00355 }
00356
00357 tl.unlock();
00358 }
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373 void
00374 FawkesThreadManager::remove_maybelocked(Thread *thread, bool lock)
00375 {
00376 if ( thread == NULL ) return;
00377
00378 if ( ! (initializer && finalizer) ) {
00379 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00380 }
00381
00382 MutexLocker locker(threads.mutex(), lock);
00383 try {
00384 if ( ! thread->prepare_finalize() ) {
00385 thread->cancel_finalize();
00386 throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
00387 }
00388 } catch (CannotFinalizeThreadException &e) {
00389 e.append("FawkesThreadManager cannot stop thread '%s'", thread->name());
00390 thread->cancel_finalize();
00391 throw;
00392 }
00393
00394 thread->cancel();
00395 thread->join();
00396 finalizer->finalize(thread);
00397 thread->finalize();
00398
00399 internal_remove_thread(thread);
00400 }
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 void
00422 FawkesThreadManager::force_remove(ThreadList &tl)
00423 {
00424 if ( ! tl.sealed() ) {
00425 throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal",
00426 tl.name());
00427 }
00428
00429 tl.lock();
00430 threads.mutex()->stopby();
00431 tl.force_stop(finalizer);
00432
00433 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00434 internal_remove_thread(*i);
00435 }
00436
00437 tl.unlock();
00438 }
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457 void
00458 FawkesThreadManager::force_remove(fawkes::Thread *thread)
00459 {
00460 MutexLocker lock(threads.mutex());
00461 try {
00462 thread->prepare_finalize();
00463 } catch (Exception &e) {
00464
00465 }
00466
00467 thread->cancel();
00468 thread->join();
00469 if (finalizer) finalizer->finalize(thread);
00470 thread->finalize();
00471
00472 internal_remove_thread(thread);
00473 }
00474
00475
00476 void
00477 FawkesThreadManager::wakeup_and_wait(BlockedTimingAspect::WakeupHook hook,
00478 unsigned int timeout_usec)
00479 {
00480 MutexLocker lock(threads.mutex());
00481
00482 unsigned int timeout_sec = 0;
00483 if (timeout_usec >= 1000000) {
00484 timeout_sec = timeout_usec / 1000000;
00485 timeout_usec -= timeout_sec * 1000000;
00486 }
00487
00488
00489 if ( threads.find(hook) != threads.end() ) {
00490 threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
00491 }
00492 }
00493
00494
00495 void
00496 FawkesThreadManager::wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier)
00497 {
00498 MutexLocker lock(threads.mutex());
00499
00500 if ( threads.find(hook) != threads.end() ) {
00501 if ( barrier ) {
00502 threads[hook].wakeup(barrier);
00503 } else {
00504 threads[hook].wakeup();
00505 }
00506 if ( threads[hook].size() == 0 ) {
00507 threads.erase(hook);
00508 }
00509 }
00510 }
00511
00512
00513 void
00514 FawkesThreadManager::try_recover(std::list<std::string> &recovered_threads)
00515 {
00516 threads.lock();
00517 for (tit = threads.begin(); tit != threads.end(); ++tit) {
00518 tit->second.try_recover(recovered_threads);
00519 }
00520 threads.unlock();
00521 }
00522
00523
00524 bool
00525 FawkesThreadManager::timed_threads_exist()
00526 {
00527 return (threads.size() > 0);
00528 }
00529
00530
00531 void
00532 FawkesThreadManager::wait_for_timed_threads()
00533 {
00534 __interrupt_timed_thread_wait = false;
00535 waitcond_timedthreads->wait();
00536 if ( __interrupt_timed_thread_wait ) {
00537 __interrupt_timed_thread_wait = false;
00538 throw InterruptedException("Waiting for timed threads was interrupted");
00539 }
00540 }
00541
00542 void
00543 FawkesThreadManager::interrupt_timed_thread_wait()
00544 {
00545 __interrupt_timed_thread_wait = true;
00546 waitcond_timedthreads->wake_all();
00547 }
00548
00549
00550
00551
00552
00553
00554 ThreadCollector *
00555 FawkesThreadManager::aspect_collector() const
00556 {
00557 return __aspect_collector;
00558 }