Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * thread_manager.cpp - Thread manager 00004 * 00005 * Created: Thu Nov 3 19:11:31 2006 (on train to Cologne) 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <mainapp/thread_manager.h> 00025 #include <core/threading/thread.h> 00026 #include <core/threading/mutex_locker.h> 00027 #include <core/threading/wait_condition.h> 00028 #include <core/threading/thread_initializer.h> 00029 #include <core/threading/thread_finalizer.h> 00030 #include <core/exceptions/software.h> 00031 #include <core/exceptions/system.h> 00032 00033 #include <aspect/blocked_timing.h> 00034 00035 using namespace fawkes; 00036 00037 /** @class FawkesThreadManager mainapp/thread_manager.h 00038 * Thread Manager. 00039 * This class provides a manager for the threads. Threads are memorized by 00040 * their wakeup hook. When the thread manager is deleted, all threads are 00041 * appropriately cancelled, joined and deleted. Thus the thread manager 00042 * can be used for "garbage collection" of threads. 00043 * 00044 * The thread manager allows easy wakeup of threads of a given wakeup hook. 00045 * 00046 * The thread manager needs a thread initializer. Each thread that is added 00047 * to the thread manager is initialized with this. The runtime type information 00048 * (RTTI) supplied by C++ can be used to initialize threads if appropriate 00049 * (if the thread has certain aspects that need special treatment). 00050 * 00051 * @author Tim Niemueller 00052 */ 00053 00054 FawkesThreadManager::FawkesThreadManagerAspectCollector::FawkesThreadManagerAspectCollector(FawkesThreadManager *parent_manager) 00055 { 00056 __parent_manager = parent_manager; 00057 } 00058 00059 00060 void 00061 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(ThreadList &tl) 00062 { 00063 BlockedTimingAspect *timed_thread; 00064 00065 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00066 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) { 00067 throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect"); 00068 } 00069 } 00070 00071 __parent_manager->add_maybelocked(tl, /* lock */ false); 00072 } 00073 00074 00075 void 00076 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(Thread *t) 00077 { 00078 BlockedTimingAspect *timed_thread; 00079 00080 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00081 throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect"); 00082 } 00083 00084 __parent_manager->add_maybelocked(t, /* lock */ false); 00085 } 00086 00087 00088 void 00089 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(ThreadList &tl) 00090 { 00091 BlockedTimingAspect *timed_thread; 00092 00093 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00094 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) { 00095 throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect"); 00096 } 00097 } 00098 00099 __parent_manager->remove_maybelocked(tl, /* lock */ false); 00100 } 00101 00102 00103 void 00104 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(Thread *t) 00105 { 00106 BlockedTimingAspect *timed_thread; 00107 00108 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00109 throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect"); 00110 } 00111 00112 __parent_manager->remove_maybelocked(t, /* lock */ false); 00113 } 00114 00115 00116 void 00117 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl) 00118 { 00119 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads"); 00120 } 00121 00122 void 00123 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::Thread *t) 00124 { 00125 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads"); 00126 } 00127 00128 00129 /** Constructor. 00130 */ 00131 FawkesThreadManager::FawkesThreadManager() 00132 { 00133 initializer = NULL; 00134 finalizer = NULL; 00135 threads.clear(); 00136 waitcond_timedthreads = new WaitCondition(); 00137 __interrupt_timed_thread_wait = false; 00138 __aspect_collector = new FawkesThreadManagerAspectCollector(this); 00139 } 00140 00141 00142 /** Destructor. */ 00143 FawkesThreadManager::~FawkesThreadManager() 00144 { 00145 // stop all threads, we call finalize, and we run through it as long as there are 00146 // still running threads, after that, we force the thread's death. 00147 for (tit = threads.begin(); tit != threads.end(); ++tit) { 00148 (*tit).second.force_stop(finalizer); 00149 } 00150 untimed_threads.force_stop(finalizer); 00151 threads.clear(); 00152 00153 delete waitcond_timedthreads; 00154 delete __aspect_collector; 00155 } 00156 00157 00158 /** Set initializer/finalizer. 00159 * This method has to be called before any thread is added/removed. 00160 * @param initializer thread initializer 00161 * @param finalizer thread finalizer 00162 */ 00163 void 00164 FawkesThreadManager::set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer) 00165 { 00166 this->initializer = initializer; 00167 this->finalizer = finalizer; 00168 } 00169 00170 00171 /** Remove the given thread from internal structures. 00172 * Thread is removed from the internal structures. If the thread has the 00173 * BlockedTimingAspect then the hook is added to the changed list. 00174 * 00175 * @param t thread to remove 00176 * @param changed list of changed hooks, appropriate hook is added if necessary 00177 */ 00178 void 00179 FawkesThreadManager::internal_remove_thread(Thread *t) 00180 { 00181 BlockedTimingAspect *timed_thread; 00182 00183 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00184 // find thread and remove 00185 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook(); 00186 if ( threads.find(hook) != threads.end() ) { 00187 threads[hook].remove_locked(t); 00188 if (threads[hook].empty()) threads.erase(hook); 00189 } 00190 } else { 00191 untimed_threads.remove_locked(t); 00192 } 00193 } 00194 00195 00196 /** Add the given thread to internal structures. 00197 * Thread is added to the internal structures. If the thread has the 00198 * BlockedTimingAspect then the hook is added to the changed list. 00199 * 00200 * @param t thread to add 00201 * @param changed list of changed hooks, appropriate hook is added if necessary 00202 */ 00203 void 00204 FawkesThreadManager::internal_add_thread(Thread *t) 00205 { 00206 BlockedTimingAspect *timed_thread; 00207 if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) { 00208 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook(); 00209 00210 if ( threads.find(hook) == threads.end() ) { 00211 threads[hook].set_name("FawkesThreadManagerList Hook %i", hook); 00212 threads[hook].set_maintain_barrier(true); 00213 } 00214 threads[hook].push_back_locked(t); 00215 00216 waitcond_timedthreads->wake_all(); 00217 } else { 00218 untimed_threads.push_back_locked(t); 00219 } 00220 } 00221 00222 00223 /** Add threads. 00224 * Add the given threads to the thread manager. The threads are initialised 00225 * as appropriate and started. See the class documentation for supported 00226 * specialisations of threads and the performed initialisation steps. 00227 * If the thread initializer cannot initalize one or more threads no thread 00228 * is added. In this regard the operation is atomic, either all threads are 00229 * added or none. 00230 * @param tl thread list with threads to add 00231 * @exception CannotInitializeThreadException thrown if at least one of the 00232 * threads could not be initialised 00233 */ 00234 void 00235 FawkesThreadManager::add_maybelocked(ThreadList &tl, bool lock) 00236 { 00237 if ( ! (initializer && finalizer) ) { 00238 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set"); 00239 } 00240 00241 if ( tl.sealed() ) { 00242 throw Exception("Not accepting new threads from list that is not fresh, " 00243 "list '%s' already sealed", tl.name()); 00244 } 00245 00246 tl.lock(); 00247 00248 // Try to initialise all threads 00249 try { 00250 tl.init(initializer, finalizer); 00251 } catch (Exception &e) { 00252 tl.unlock(); 00253 throw; 00254 } 00255 00256 tl.seal(); 00257 tl.start(); 00258 00259 // All thread initialized, now add threads to internal structure 00260 MutexLocker locker(threads.mutex(), lock); 00261 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00262 internal_add_thread(*i); 00263 } 00264 00265 tl.unlock(); 00266 } 00267 00268 00269 /** Add one thread. 00270 * Add the given thread to the thread manager. The thread is initialized 00271 * as appropriate and started. See the class documentation for supported 00272 * specialisations of threads and the performed initialisation steps. 00273 * If the thread initializer cannot initalize the thread it is not added. 00274 * @param thread thread to add 00275 * @param lock if true the environment is locked before adding the thread 00276 * @exception CannotInitializeThreadException thrown if at least the 00277 * thread could not be initialised 00278 */ 00279 void 00280 FawkesThreadManager::add_maybelocked(Thread *thread, bool lock) 00281 { 00282 if ( thread == NULL ) { 00283 throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread"); 00284 } 00285 00286 if ( ! (initializer && finalizer) ) { 00287 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set"); 00288 } 00289 00290 try { 00291 initializer->init(thread); 00292 } catch (CannotInitializeThreadException &e) { 00293 e.append("Adding thread in FawkesThreadManager failed"); 00294 throw; 00295 } 00296 00297 thread->start(); 00298 MutexLocker locker(threads.mutex(), lock); 00299 internal_add_thread(thread); 00300 } 00301 00302 00303 /** Remove the given threads. 00304 * The thread manager tries to finalize and stop the threads and then removes the 00305 * threads from the internal structures. 00306 * 00307 * This may fail if at least one thread of the given list cannot be finalized, for 00308 * example if prepare_finalize() returns false or if the thread finalizer cannot 00309 * finalize the thread. In this case a CannotFinalizeThreadException is thrown. 00310 * 00311 * @param tl threads to remove. 00312 * @exception CannotFinalizeThreadException At least one thread cannot be safely 00313 * finalized 00314 * @exception ThreadListNotSealedException if the given thread lits tl is not 00315 * sealed the thread manager will refuse to remove it 00316 */ 00317 void 00318 FawkesThreadManager::remove_maybelocked(ThreadList &tl, bool lock) 00319 { 00320 if ( ! (initializer && finalizer) ) { 00321 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set"); 00322 } 00323 00324 00325 if ( ! tl.sealed() ) { 00326 throw ThreadListNotSealedException("(FawkesThreadManager) Cannot remove unsealed thread " 00327 "list. Not accepting unsealed list '%s' for removal", 00328 tl.name()); 00329 } 00330 00331 tl.lock(); 00332 MutexLocker locker(threads.mutex(), lock); 00333 00334 try { 00335 if ( ! tl.prepare_finalize(finalizer) ) { 00336 tl.cancel_finalize(); 00337 tl.unlock(); 00338 throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be " 00339 "finalized", tl.name()); 00340 } 00341 } catch (CannotFinalizeThreadException &e) { 00342 tl.unlock(); 00343 throw; 00344 } catch (Exception &e) { 00345 tl.unlock(); 00346 e.append("One or more threads in list '%s' cannot be finalized", tl.name()); 00347 throw CannotFinalizeThreadException(e); 00348 } 00349 00350 tl.stop(); 00351 tl.finalize(finalizer); 00352 00353 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00354 internal_remove_thread(*i); 00355 } 00356 00357 tl.unlock(); 00358 } 00359 00360 00361 /** Remove the given thread. 00362 * The thread manager tries to finalize and stop the thread and then removes the 00363 * thread from the internal structures. 00364 * 00365 * This may fail if the thread cannot be finalized, for 00366 * example if prepare_finalize() returns false or if the thread finalizer cannot 00367 * finalize the thread. In this case a CannotFinalizeThreadException is thrown. 00368 * 00369 * @param thread thread to remove. 00370 * @exception CannotFinalizeThreadException At least one thread cannot be safely 00371 * finalized 00372 */ 00373 void 00374 FawkesThreadManager::remove_maybelocked(Thread *thread, bool lock) 00375 { 00376 if ( thread == NULL ) return; 00377 00378 if ( ! (initializer && finalizer) ) { 00379 throw NullPointerException("FawkesThreadManager: initializer/finalizer not set"); 00380 } 00381 00382 MutexLocker locker(threads.mutex(), lock); 00383 try { 00384 if ( ! thread->prepare_finalize() ) { 00385 thread->cancel_finalize(); 00386 throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name()); 00387 } 00388 } catch (CannotFinalizeThreadException &e) { 00389 e.append("FawkesThreadManager cannot stop thread '%s'", thread->name()); 00390 thread->cancel_finalize(); 00391 throw; 00392 } 00393 00394 thread->cancel(); 00395 thread->join(); 00396 finalizer->finalize(thread); 00397 thread->finalize(); 00398 00399 internal_remove_thread(thread); 00400 } 00401 00402 00403 00404 00405 /** Force removal of the given threads. 00406 * The thread manager tries to finalize and stop the threads and then removes the 00407 * threads from the internal structures. 00408 * 00409 * This will succeed even if a thread of the given list cannot be finalized, for 00410 * example if prepare_finalize() returns false or if the thread finalizer cannot 00411 * finalize the thread. 00412 * 00413 * <b>Caution, using this function may damage your robot.</b> 00414 * 00415 * @param tl threads to remove. 00416 * @exception ThreadListNotSealedException if the given thread lits tl is not 00417 * sealed the thread manager will refuse to remove it 00418 * The threads are removed from thread manager control. The threads will be stopped 00419 * before they are removed (may cause unpredictable results otherwise). 00420 */ 00421 void 00422 FawkesThreadManager::force_remove(ThreadList &tl) 00423 { 00424 if ( ! tl.sealed() ) { 00425 throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal", 00426 tl.name()); 00427 } 00428 00429 tl.lock(); 00430 threads.mutex()->stopby(); 00431 tl.force_stop(finalizer); 00432 00433 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) { 00434 internal_remove_thread(*i); 00435 } 00436 00437 tl.unlock(); 00438 } 00439 00440 00441 /** Force removal of the given thread. 00442 * The thread manager tries to finalize and stop the thread and then removes the 00443 * thread from the internal structures. 00444 * 00445 * This will succeed even if the thread cannot be finalized, for 00446 * example if prepare_finalize() returns false or if the thread finalizer cannot 00447 * finalize the thread. 00448 * 00449 * <b>Caution, using this function may damage your robot.</b> 00450 * 00451 * @param thread thread to remove. 00452 * @exception ThreadListNotSealedException if the given thread lits tl is not 00453 * sealed the thread manager will refuse to remove it 00454 * The threads are removed from thread manager control. The threads will be stopped 00455 * before they are removed (may cause unpredictable results otherwise). 00456 */ 00457 void 00458 FawkesThreadManager::force_remove(fawkes::Thread *thread) 00459 { 00460 MutexLocker lock(threads.mutex()); 00461 try { 00462 thread->prepare_finalize(); 00463 } catch (Exception &e) { 00464 // ignore 00465 } 00466 00467 thread->cancel(); 00468 thread->join(); 00469 if (finalizer) finalizer->finalize(thread); 00470 thread->finalize(); 00471 00472 internal_remove_thread(thread); 00473 } 00474 00475 00476 void 00477 FawkesThreadManager::wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, 00478 unsigned int timeout_usec) 00479 { 00480 MutexLocker lock(threads.mutex()); 00481 00482 unsigned int timeout_sec = 0; 00483 if (timeout_usec >= 1000000) { 00484 timeout_sec = timeout_usec / 1000000; 00485 timeout_usec -= timeout_sec * 1000000; 00486 } 00487 00488 // Note that the following lines might throw an exception, we just pass it on 00489 if ( threads.find(hook) != threads.end() ) { 00490 threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000); 00491 } 00492 } 00493 00494 00495 void 00496 FawkesThreadManager::wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier) 00497 { 00498 MutexLocker lock(threads.mutex()); 00499 00500 if ( threads.find(hook) != threads.end() ) { 00501 if ( barrier ) { 00502 threads[hook].wakeup(barrier); 00503 } else { 00504 threads[hook].wakeup(); 00505 } 00506 if ( threads[hook].size() == 0 ) { 00507 threads.erase(hook); 00508 } 00509 } 00510 } 00511 00512 00513 void 00514 FawkesThreadManager::try_recover(std::list<std::string> &recovered_threads) 00515 { 00516 threads.lock(); 00517 for (tit = threads.begin(); tit != threads.end(); ++tit) { 00518 tit->second.try_recover(recovered_threads); 00519 } 00520 threads.unlock(); 00521 } 00522 00523 00524 bool 00525 FawkesThreadManager::timed_threads_exist() 00526 { 00527 return (threads.size() > 0); 00528 } 00529 00530 00531 void 00532 FawkesThreadManager::wait_for_timed_threads() 00533 { 00534 __interrupt_timed_thread_wait = false; 00535 waitcond_timedthreads->wait(); 00536 if ( __interrupt_timed_thread_wait ) { 00537 __interrupt_timed_thread_wait = false; 00538 throw InterruptedException("Waiting for timed threads was interrupted"); 00539 } 00540 } 00541 00542 void 00543 FawkesThreadManager::interrupt_timed_thread_wait() 00544 { 00545 __interrupt_timed_thread_wait = true; 00546 waitcond_timedthreads->wake_all(); 00547 } 00548 00549 00550 00551 /** Get a thread collector to be used for an aspect initializer. 00552 * @return thread collector instance to use for ThreadProducerAspect. 00553 */ 00554 ThreadCollector * 00555 FawkesThreadManager::aspect_collector() const 00556 { 00557 return __aspect_collector; 00558 }