Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * base_thread.cpp - FireVision Base Thread 00004 * 00005 * Created: Tue May 29 16:41:50 2007 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. 00014 * 00015 * This program is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 * GNU Library General Public License for more details. 00019 * 00020 * Read the full text in the LICENSE.GPL file in the doc directory. 00021 */ 00022 00023 #include "base_thread.h" 00024 #include "acquisition_thread.h" 00025 #include "aqt_vision_threads.h" 00026 00027 #include <core/threading/thread.h> 00028 #include <core/threading/mutex.h> 00029 #include <core/threading/mutex_locker.h> 00030 #include <core/threading/barrier.h> 00031 #include <utils/logging/logger.h> 00032 00033 #include <fvutils/system/camargp.h> 00034 #include <fvutils/ipc/shm_image.h> 00035 #include <fvutils/ipc/shm_lut.h> 00036 #include <cams/factory.h> 00037 #include <cams/cam_exceptions.h> 00038 #include <cams/control/factory.h> 00039 #include <core/exceptions/software.h> 00040 00041 #include <aspect/vision.h> 00042 00043 #include <algorithm> 00044 #include <unistd.h> 00045 00046 using namespace fawkes; 00047 using namespace firevision; 00048 00049 /** @class FvBaseThread "base_thread.h" 00050 * FireVision base thread. 00051 * This implements the functionality of the FvBasePlugin. 00052 * @author Tim Niemueller 00053 */ 00054 00055 /** Constructor. */ 00056 FvBaseThread::FvBaseThread() 00057 : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP), 00058 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR), 00059 VisionMasterAspect(this) 00060 { 00061 // default to 30 seconds 00062 __aqt_timeout = 30; 00063 __aqt_barrier = new Barrier(1); 00064 } 00065 00066 00067 /** Destructor. */ 00068 FvBaseThread::~FvBaseThread() 00069 { 00070 delete __aqt_barrier; 00071 } 00072 00073 00074 void 00075 FvBaseThread::init() 00076 { 00077 // wipe all previously existing FireVision shared memory segments 00078 // that are orphaned 00079 SharedMemoryImageBuffer::cleanup(/* use lister */ false); 00080 SharedMemoryLookupTable::cleanup(/* use lister */ false); 00081 } 00082 00083 00084 void 00085 FvBaseThread::finalize() 00086 { 00087 __aqts.lock(); 00088 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00089 thread_collector->remove(__ait->second); 00090 delete __ait->second; 00091 } 00092 __aqts.clear(); 00093 __aqts.unlock(); 00094 __owned_controls.lock(); 00095 LockList<CameraControl *>::iterator i; 00096 for (i = __owned_controls.begin(); i != __owned_controls.end(); ++i) { 00097 delete *i; 00098 } 00099 __owned_controls.clear(); 00100 __owned_controls.unlock(); 00101 } 00102 00103 00104 /** Thread loop. */ 00105 void 00106 FvBaseThread::loop() 00107 { 00108 __aqts.lock(); 00109 00110 try { 00111 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00112 __ait->second->set_vt_prepfin_hold(true); 00113 } 00114 } catch (Exception &e) { 00115 logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop"); 00116 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00117 __ait->second->set_vt_prepfin_hold(false); 00118 } 00119 __aqts.unlock(); 00120 return; 00121 } 00122 00123 // Wakeup all cyclic acquisition threads and wait for them 00124 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00125 if ( __ait->second->aqtmode() == FvAcquisitionThread::AqtCyclic ) { 00126 //logger->log_debug(name(), "Waking Thread %s", __ait->second->name()); 00127 __ait->second->wakeup(__aqt_barrier); 00128 } 00129 } 00130 00131 __aqt_barrier->wait(); 00132 00133 // Check for aqt timeouts 00134 for (__ait = __aqts.begin(); __ait != __aqts.end();) { 00135 if ( __ait->second->vision_threads->empty() && 00136 (__ait->second->vision_threads->empty_time() > __aqt_timeout) ) { 00137 00138 logger->log_info(name(), "Acquisition thread %s timed out, destroying", 00139 __ait->second->name()); 00140 00141 00142 thread_collector->remove(__ait->second); 00143 delete __ait->second; 00144 __aqts.erase(__ait++); 00145 } else { 00146 ++__ait; 00147 } 00148 } 00149 00150 __started_threads.lock(); 00151 fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = __started_threads.begin(); 00152 while (stit != __started_threads.end()) { 00153 00154 logger->log_info(name(), "Thread %s has been started, %zu", 00155 stit->second->name(), __started_threads.size()); 00156 00157 // if the thread is registered in that aqt mark it running 00158 stit->second->vision_threads->set_thread_running(stit->first); 00159 00160 if ( stit->second->vision_threads->has_cyclic_thread() ) { 00161 if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic ) { 00162 logger->log_info(name(), "Switching acquisition thread %s to cyclic mode", 00163 stit->second->name()); 00164 00165 stit->second->prepare_finalize(); 00166 stit->second->cancel(); 00167 stit->second->join(); 00168 stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic); 00169 stit->second->start(); 00170 stit->second->cancel_finalize(); 00171 } 00172 } else if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) { 00173 logger->log_info(name(), "Switching acquisition thread %s to continuous mode", 00174 stit->second->name()); 00175 stit->second->prepare_finalize(); 00176 stit->second->cancel(); 00177 stit->second->join(); 00178 stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous); 00179 stit->second->start(); 00180 stit->second->cancel_finalize(); 00181 } 00182 00183 // Make thread actually capture data 00184 stit->second->set_enabled(true); 00185 00186 fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stittmp = stit; 00187 ++stit; 00188 __started_threads.erase( stittmp ); 00189 } 00190 __started_threads.unlock(); 00191 00192 // Re-create barrier as necessary after _adding_ threads 00193 unsigned int num_cyclic_threads = 0; 00194 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00195 if ( __ait->second->vision_threads->has_cyclic_thread() ) { 00196 ++num_cyclic_threads; 00197 } 00198 } 00199 cond_recreate_barrier(num_cyclic_threads); 00200 00201 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00202 __ait->second->set_vt_prepfin_hold(false); 00203 } 00204 00205 __aqts.unlock(); 00206 } 00207 00208 00209 /** Get vision master. 00210 * @return vision master 00211 */ 00212 VisionMaster * 00213 FvBaseThread::vision_master() 00214 { 00215 return this; 00216 } 00217 00218 00219 Camera * 00220 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread, 00221 colorspace_t cspace) 00222 { 00223 Camera *c = NULL; 00224 __aqts.lock(); 00225 00226 logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string); 00227 00228 VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread); 00229 if ( vision_thread == NULL ) { 00230 throw TypeMismatchException("Thread is not a vision thread"); 00231 } 00232 00233 CameraArgumentParser *cap = new CameraArgumentParser(camera_string); 00234 try { 00235 std::string id = cap->cam_type() + "." + cap->cam_id(); 00236 if ( __aqts.find(id) != __aqts.end() ) { 00237 // this camera has already been loaded 00238 c = __aqts[id]->camera_instance(cspace, 00239 (vision_thread->vision_thread_mode() == 00240 VisionAspect::CONTINUOUS)); 00241 00242 __aqts[id]->vision_threads->add_waiting_thread(thread); 00243 00244 } else { 00245 Camera *cam = NULL; 00246 try { 00247 cam = CameraFactory::instance(cap); 00248 cam->open(); 00249 cam->start(); 00250 } catch (Exception &e) { 00251 delete cam; 00252 e.append("Could not open or start camera"); 00253 throw; 00254 } 00255 00256 FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock); 00257 00258 c = aqt->camera_instance(cspace, (vision_thread->vision_thread_mode() == 00259 VisionAspect::CONTINUOUS)); 00260 00261 aqt->vision_threads->add_waiting_thread(thread); 00262 00263 __aqts[id] = aqt; 00264 thread_collector->add(aqt); 00265 00266 // no need to recreate barrier, by default aqts operate in continuous mode 00267 00268 logger->log_info(name(), "Acquisition thread '%s' started for thread '%s' and camera '%s'", 00269 aqt->name(), thread->name(), id.c_str()); 00270 00271 } 00272 00273 thread->add_notification_listener(this); 00274 00275 } catch (UnknownCameraTypeException &e) { 00276 delete cap; 00277 e.append("FvBaseVisionMaster: could not instantiate camera"); 00278 __aqts.unlock(); 00279 throw; 00280 } catch (Exception &e) { 00281 delete cap; 00282 e.append("FvBaseVisionMaster: could not open or start camera"); 00283 __aqts.unlock(); 00284 throw; 00285 } 00286 00287 delete cap; 00288 00289 __aqts.unlock(); 00290 return c; 00291 } 00292 00293 00294 Camera * 00295 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread) 00296 { 00297 Camera *camera = register_for_camera(camera_string, thread, CS_UNKNOWN); 00298 CameraArgumentParser cap(camera_string); 00299 try { 00300 std::string id = cap.cam_type() + "." + cap.cam_id(); 00301 __aqts.lock(); 00302 if ( __aqts.find(id) != __aqts.end() ) { 00303 __aqts[id]->raw_subscriber_thread = thread; 00304 } 00305 __aqts.unlock(); 00306 } catch (Exception &e) { 00307 __aqts.unlock(); 00308 throw; 00309 } 00310 return camera; 00311 } 00312 00313 CameraControl * 00314 FvBaseThread::create_camctrl(const char *camera_string) 00315 { 00316 CameraControl *cc = CameraControlFactory::instance(camera_string); 00317 if (cc) { 00318 __owned_controls.lock(); 00319 __owned_controls.push_back(cc); 00320 __owned_controls.sort(); 00321 __owned_controls.unique(); 00322 __owned_controls.unlock(); 00323 return cc; 00324 } else { 00325 throw Exception("Cannot create camera control of desired type"); 00326 } 00327 } 00328 00329 CameraControl * 00330 FvBaseThread::acquire_camctrl(const char *cam_string) 00331 { 00332 CameraArgumentParser cap(cam_string); 00333 std::string id = cap.cam_type() + "." + cap.cam_id(); 00334 00335 // Has this camera been loaded? 00336 MutexLocker lock(__aqts.mutex()); 00337 if (__aqts.find(id) != __aqts.end()) { 00338 return CameraControlFactory::instance(__aqts[id]->get_camera()); 00339 } else { 00340 return create_camctrl(cam_string); 00341 } 00342 } 00343 00344 00345 CameraControl * 00346 FvBaseThread::acquire_camctrl(const char *cam_string, 00347 const std::type_info &typeinf) 00348 { 00349 CameraArgumentParser cap(cam_string); 00350 std::string id = cap.cam_type() + "." + cap.cam_id(); 00351 00352 // Has this camera been loaded? 00353 MutexLocker lock(__aqts.mutex()); 00354 if (__aqts.find(id) != __aqts.end()) { 00355 return CameraControlFactory::instance(typeinf, __aqts[id]->get_camera()); 00356 } else { 00357 return create_camctrl(cam_string); 00358 } 00359 } 00360 00361 00362 void 00363 FvBaseThread::release_camctrl(CameraControl *cc) 00364 { 00365 __owned_controls.lock(); 00366 LockList<CameraControl *>::iterator f; 00367 if ((f = std::find(__owned_controls.begin(), __owned_controls.end(), cc)) != __owned_controls.end()) { 00368 delete *f; 00369 __owned_controls.erase(f); 00370 } 00371 __owned_controls.unlock(); 00372 } 00373 00374 00375 /** Conditionally re-create barriers. 00376 * Re-create barriers if the number of cyclic threads has changed. 00377 * @param num_cyclic_threads new number of cyclic threads 00378 */ 00379 void 00380 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads) 00381 { 00382 if ( (num_cyclic_threads + 1) != __aqt_barrier->count() ) { 00383 delete __aqt_barrier; 00384 __aqt_barrier = new Barrier( num_cyclic_threads + 1 ); // +1 for base thread 00385 } 00386 } 00387 00388 00389 void 00390 FvBaseThread::unregister_thread(Thread *thread) 00391 { 00392 __aqts.lock(); 00393 unsigned int num_cyclic_threads = 0; 00394 00395 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00396 00397 // Remove thread from all aqts 00398 __ait->second->vision_threads->remove_thread(thread); 00399 00400 if (__ait->second->raw_subscriber_thread == thread) { 00401 __ait->second->raw_subscriber_thread = NULL; 00402 } 00403 00404 if ( __ait->second->vision_threads->has_cyclic_thread() ) { 00405 ++num_cyclic_threads; 00406 00407 } else if (__ait->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) { 00408 logger->log_info(name(), "Switching acquisition thread %s to continuous mode " 00409 "on unregister", __ait->second->name()); 00410 00411 __ait->second->prepare_finalize(); 00412 __ait->second->cancel(); 00413 __ait->second->join(); 00414 __ait->second->set_aqtmode(FvAcquisitionThread::AqtContinuous); 00415 __ait->second->start(); 00416 __ait->second->cancel_finalize(); 00417 } 00418 } 00419 // Recreate as necessary after _removing_ threads 00420 cond_recreate_barrier(num_cyclic_threads); 00421 00422 __aqts.unlock(); 00423 } 00424 00425 00426 bool 00427 FvBaseThread::thread_started(Thread *thread) throw() 00428 { 00429 __aqts.lock(); 00430 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00431 if (__ait->second->vision_threads->has_waiting_thread(thread)) { 00432 __started_threads.lock(); 00433 __started_threads[thread] = __ait->second; 00434 __started_threads.unlock(); 00435 } 00436 } 00437 __aqts.unlock(); 00438 00439 return false; 00440 } 00441 00442 00443 bool 00444 FvBaseThread::thread_init_failed(Thread *thread) throw() 00445 { 00446 __aqts.lock(); 00447 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) { 00448 __ait->second->vision_threads->remove_waiting_thread(thread); 00449 } 00450 __aqts.unlock(); 00451 00452 return false; 00453 }