Fawkes API Fawkes Development Version

notifier.cpp

00001  
00002 /***************************************************************************
00003  *  notifier.cpp - BlackBoard notifier
00004  *
00005  *  Created: Mon Mar 03 23:28:18 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <blackboard/internal/notifier.h>
00025 #include <blackboard/blackboard.h>
00026 #include <blackboard/interface_listener.h>
00027 #include <blackboard/interface_observer.h>
00028 
00029 #include <core/threading/mutex.h>
00030 #include <core/threading/mutex_locker.h>
00031 #include <core/threading/wait_condition.h>
00032 #include <core/utils/lock_hashset.h>
00033 #include <core/utils/lock_hashmap.h>
00034 #include <utils/logging/liblogger.h>
00035 #include <interface/interface.h>
00036 
00037 #include <algorithm>
00038 #include <functional>
00039 #include <cstdlib>
00040 #include <cstring>
00041 #include <fnmatch.h>
00042 
00043 namespace fawkes {
00044 
00045 /** @class BlackBoardNotifier <blackboard/internal/notifier.h>
00046  * BlackBoard notifier.
00047  * This class is used by the BlackBoard to notify listeners and observers
00048  * of changes. 
00049  *
00050  * @author Tim Niemueller
00051  */
00052 
00053 
00054 /** Constructor. */
00055 BlackBoardNotifier::BlackBoardNotifier()
00056 {
00057   __bbil_writer_events     = 0;
00058   __bbil_writer_mutex      = new Mutex();
00059   __bbil_writer_waitcond   = new WaitCondition(__bbil_writer_mutex);
00060 
00061   __bbil_reader_events     = 0;
00062   __bbil_reader_mutex      = new Mutex();
00063   __bbil_reader_waitcond   = new WaitCondition(__bbil_reader_mutex);
00064 
00065   __bbil_data_events       = 0;
00066   __bbil_data_mutex        = new Mutex();
00067   __bbil_data_waitcond     = new WaitCondition(__bbil_data_mutex);
00068 
00069   __bbil_messages_events   = 0;
00070   __bbil_messages_mutex    = new Mutex();
00071   __bbil_messages_waitcond = new WaitCondition(__bbil_messages_mutex);
00072 
00073   __bbio_events            = 0;
00074   __bbio_mutex             = new Mutex();
00075   __bbio_waitcond          = new WaitCondition(__bbio_mutex);
00076 }
00077 
00078 
00079 /** Destructor */
00080 BlackBoardNotifier::~BlackBoardNotifier()
00081 {
00082   delete __bbil_writer_waitcond;
00083   delete __bbil_writer_mutex;
00084 
00085   delete __bbil_reader_waitcond;
00086   delete __bbil_reader_mutex;
00087 
00088   delete __bbil_data_waitcond;
00089   delete __bbil_data_mutex;
00090 
00091   delete __bbil_messages_waitcond;
00092   delete __bbil_messages_mutex;
00093 
00094   delete __bbio_waitcond;
00095   delete __bbio_mutex;
00096 }
00097 
00098 
00099 /** Register BB event listener.
00100  * @param listener BlackBoard event listener to register
00101  * @param flags an or'ed combination of BBIL_FLAG_DATA, BBIL_FLAG_READER, BBIL_FLAG_WRITER
00102  * and BBIL_FLAG_INTERFACE. Only for the given types the event listener is registered.
00103  * BBIL_FLAG_ALL can be supplied to register for all events.
00104  */
00105 void
00106 BlackBoardNotifier::register_listener(BlackBoardInterfaceListener *listener,
00107                                       unsigned int flags)
00108 {
00109   if ( flags & BlackBoard::BBIL_FLAG_DATA ) {
00110     __bbil_data_mutex->lock();
00111     if (__bbil_data_events > 0) {
00112       LibLogger::log_warn("BlackBoardNotifier", "Registering interface listener %s "
00113                           "for data events queued",
00114                           listener->bbil_name());
00115       __bbil_data_queue.push_back(std::make_pair(true, listener));
00116     } else {
00117       add_listener(listener, listener->bbil_data_interfaces(), __bbil_data);
00118     }
00119     __bbil_data_mutex->unlock();
00120   }
00121   if ( flags & BlackBoard::BBIL_FLAG_MESSAGES ) {
00122     __bbil_messages_mutex->lock();
00123     BlackBoardInterfaceListener::InterfaceLockMapIterator i;
00124     BlackBoardInterfaceListener::InterfaceLockMap *im = listener->bbil_message_interfaces();
00125     MutexLocker lock(im->mutex());
00126     for (i = im->begin(); i != im->end(); ++i) {
00127       if ( ! i->second->is_writer() ||
00128            (__bbil_messages.find(i->first) != __bbil_messages.end()) ) {
00129         __bbil_messages_mutex->unlock();
00130         throw Exception("An interface listener has already been registered for %s",
00131                         i->first.c_str());
00132       }
00133     }
00134     for (i = im->begin(); i != im->end(); ++i) {
00135       __bbil_messages[i->first] = listener;
00136     }
00137     __bbil_messages_mutex->unlock();
00138   }
00139   if ( flags & BlackBoard::BBIL_FLAG_READER ) {
00140     __bbil_reader_mutex->lock();
00141     if (__bbil_reader_events > 0) {
00142       LibLogger::log_warn("BlackBoardNotifier", "Registering interface listener %s "
00143                           "for reader events queued",
00144                           listener->bbil_name());
00145       __bbil_reader_queue.push_back(std::make_pair(true, listener));
00146     } else {
00147       add_listener(listener, listener->bbil_reader_interfaces(), __bbil_reader);
00148     }
00149     __bbil_reader_mutex->unlock();
00150   }
00151   if ( flags & BlackBoard::BBIL_FLAG_WRITER ) {
00152     __bbil_writer_mutex->lock();
00153     if (__bbil_writer_events > 0) {
00154       LibLogger::log_warn("BlackBoardNotifier", "Registering interface listener %s "
00155                           "for writer events queued",
00156                           listener->bbil_name());
00157       __bbil_writer_queue.push_back(std::make_pair(true, listener));
00158     } else {
00159       add_listener(listener, listener->bbil_writer_interfaces(), __bbil_writer);
00160     }
00161     __bbil_writer_mutex->unlock();
00162   }
00163 }
00164 
00165 
00166 /** Unregister BB interface listener.
00167  * This will remove the given BlackBoard interface listener from any event that it was
00168  * previously registered for.
00169  * @param listener BlackBoard event listener to remove
00170  */
00171 void
00172 BlackBoardNotifier::unregister_listener(BlackBoardInterfaceListener *listener)
00173 {
00174   remove_listener(listener, __bbil_writer_mutex, __bbil_writer_events,
00175                   __bbil_writer_queue, __bbil_writer);
00176   remove_listener(listener, __bbil_reader_mutex, __bbil_reader_events,
00177                   __bbil_reader_queue, __bbil_reader);
00178   remove_listener(listener, __bbil_data_mutex, __bbil_data_events,
00179                   __bbil_data_queue, __bbil_data);
00180   remove_message_listener(listener);
00181 }
00182 
00183 /** Add listener for specified map..
00184  * @param listener interface listener for events
00185  * @param im map of interfaces to listen for
00186  * @param ilmap internal map to add listener to
00187  */
00188 void
00189 BlackBoardNotifier::add_listener(BlackBoardInterfaceListener *listener,
00190                                  BlackBoardInterfaceListener::InterfaceLockMap *im,
00191                                  BBilMap &ilmap)
00192 {
00193   BlackBoardInterfaceListener::InterfaceLockMapIterator i;
00194   im->lock();
00195   for (i = im->begin(); i != im->end(); ++i) {
00196     ilmap[i->first].push_back(listener);
00197   }
00198   im->unlock();
00199 }
00200 
00201 void
00202 BlackBoardNotifier::remove_listener(BlackBoardInterfaceListener *listener,
00203                                     Mutex *mutex, unsigned int events,
00204                                     BBilQueue &queue, BBilMap &ilmap)
00205 {
00206   MutexLocker lock(mutex);
00207   if (events > 0) {
00208     //LibLogger::log_warn("BlackBoardNotifier", "UN-registering interface listener %s queued",
00209     //                    listener->bbil_name());
00210 
00211     BBilQueue::iterator re;
00212     if ( (re = find(queue.begin(), queue.end(),
00213                     std::make_pair(true, listener))) != queue.end()) {
00214       // if there is an entry in the register queue, remove it!
00215       queue.erase(re);
00216     }
00217     queue.push_back(std::make_pair(false, listener));
00218   } else {
00219     remove_listener(ilmap, listener);
00220   }
00221 }
00222 
00223 
00224 /** Remove listener from map.
00225  * @param ilmap interface listener map to remove the listener from
00226  * @param listener listener to remove
00227  */
00228 void
00229 BlackBoardNotifier::remove_listener(BBilMap &ilmap, BlackBoardInterfaceListener *listener)
00230 {
00231   BBilMapIterator i, tmp;
00232 
00233   i = ilmap.begin();;
00234   while (i != ilmap.end()) {
00235     BBilListIterator j = i->second.begin();
00236     while (j != i->second.end()) {
00237       if ( *j == listener ) {
00238         j = i->second.erase(j);
00239       } else {
00240         ++j;
00241       }
00242     }
00243     if ( i->second.empty() ) {
00244       tmp = i;
00245       ++i;
00246       ilmap.erase(tmp);
00247     } else {
00248       ++i;
00249     }
00250   }
00251 }
00252 
00253 
00254 void
00255 BlackBoardNotifier::remove_message_listener_map(BlackBoardInterfaceListener *listener)
00256 {
00257   BBilMessageLockMapIterator i, tmp;
00258 
00259   i = __bbil_messages.begin();;
00260   while (i != __bbil_messages.end()) {
00261     if ( i->second == listener ) {
00262       // found!
00263       tmp = i;
00264       ++i;
00265       __bbil_messages.erase(tmp);
00266     } else {
00267       ++i;
00268     }
00269   }
00270 }
00271 
00272 
00273 void
00274 BlackBoardNotifier::remove_message_listener(BlackBoardInterfaceListener *listener)
00275 {
00276   __bbil_messages_mutex->lock();
00277   if (__bbil_messages_events > 0) {
00278     //LibLogger::log_warn("BlackBoardNotifier", "UN-registering interface (message) listener %s queued",
00279     //                  listener->bbil_name());
00280 
00281     BBilQueue::iterator re;
00282     if ( (re = find(__bbil_messages_queue.begin(), __bbil_messages_queue.end(),
00283                     std::make_pair(true, listener))) != __bbil_messages_queue.end()) {
00284       // if there is an entry in the register queue, remove it!
00285       __bbil_messages_queue.erase(re);
00286     }
00287     __bbil_messages_queue.push_back(std::make_pair(false, listener));
00288   } else {
00289     remove_message_listener_map(listener);
00290   }
00291   __bbil_messages_mutex->unlock();
00292 }
00293 
00294 
00295 /** Register BB interface observer.
00296  * @param observer BlackBoard interface observer to register
00297  * @param flags an or'ed combination of BBIO_FLAG_CREATED, BBIO_FLAG_DESTROYED
00298  */
00299 void
00300 BlackBoardNotifier::register_observer(BlackBoardInterfaceObserver *observer,
00301                                       unsigned int flags)
00302 {
00303   __bbio_mutex->lock();
00304   if (__bbio_events > 0) {
00305     __bbio_queue.push_back(std::make_pair(flags, observer));
00306   } else {
00307     if ( flags & BlackBoard::BBIO_FLAG_CREATED ) {
00308       add_observer(observer, observer->bbio_get_observed_create(), __bbio_created);
00309     }
00310     if ( flags & BlackBoard::BBIO_FLAG_DESTROYED ) {
00311       add_observer(observer, observer->bbio_get_observed_destroy(), __bbio_destroyed);
00312     }
00313   }
00314   __bbio_mutex->unlock();
00315 }
00316 
00317 
00318 void
00319 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver *observer,
00320                                  BlackBoardInterfaceObserver::ObservedInterfaceLockMap *its,
00321                                  BBioMap &bbiomap)
00322 {
00323   BlackBoardInterfaceObserver::ObservedInterfaceLockMapIterator i;
00324   its->lock();
00325   for (i = its->begin(); i != its->end(); ++i) {
00326     bbiomap[i->first].push_back(make_pair(observer, i->second));
00327   }
00328   its->unlock();
00329 }
00330 
00331 
00332 /** Remove observer from map.
00333  * @param iomap interface observer map to remove the observer from
00334  * @param observer observer to remove
00335  */
00336 void
00337 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
00338 {
00339   BBioMapIterator i, tmp;
00340 
00341   i = iomap.begin();
00342   while (i != iomap.end()) {
00343     BBioListIterator j = i->second.begin();
00344     while (j != i->second.end()) {
00345       if ( j->first == observer ) {
00346         j = i->second.erase(j);
00347       } else {
00348         ++j;
00349       }
00350     }
00351     if ( i->second.empty() ) {
00352       tmp = i;
00353       ++i;
00354       iomap.erase(tmp);
00355     } else {
00356       ++i;
00357     }
00358   }
00359 }
00360 
00361 /** Unregister BB interface observer.
00362  * This will remove the given BlackBoard event listener from any event that it was
00363  * previously registered for.
00364  * @param observer BlackBoard event listener to remove
00365  */
00366 void
00367 BlackBoardNotifier::unregister_observer(BlackBoardInterfaceObserver *observer)
00368 {
00369   MutexLocker lock(__bbio_mutex);
00370   if ( __bbio_events > 0) {
00371     BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
00372     BBioQueue::iterator re;
00373     while ( (re = find_if(__bbio_queue.begin(), __bbio_queue.end(),
00374                           bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
00375             != __bbio_queue.end()) {
00376       // if there is an entry in the register queue, remove it!
00377       if (re->second == observer) {
00378         __bbio_queue.erase(re);
00379       }
00380     }
00381     __bbio_queue.push_back(std::make_pair(0, observer));
00382 
00383   } else {
00384     remove_observer(__bbio_created, observer);
00385     remove_observer(__bbio_destroyed, observer);
00386   }
00387 }
00388 
00389 /** Notify that an interface has been created.
00390  * @param type type of the interface
00391  * @param id ID of the interface
00392  */
00393 void
00394 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) throw()
00395 {
00396   __bbio_mutex->lock();
00397   __bbio_events += 1;
00398   __bbio_mutex->unlock();
00399 
00400   BBioMapIterator lhmi;
00401   BBioListIterator i, l;
00402   if ( (lhmi = __bbio_created.find(type)) != __bbio_created.end() ) {
00403     BBioList &list = (*lhmi).second;
00404     for (i = list.begin(); i != list.end(); ++i) {
00405       BlackBoardInterfaceObserver *bbio = i->first;
00406       for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
00407         if (fnmatch(pi->c_str(), id, 0) == 0) {
00408           bbio->bb_interface_created(type, id);
00409           break;
00410         }
00411       }
00412     }
00413   }
00414 
00415   __bbio_mutex->lock();
00416   __bbio_events -= 1;
00417   process_bbio_queue();
00418   __bbio_mutex->unlock();
00419 }
00420 
00421 
00422 /** Notify that an interface has been destroyed.
00423  * @param type type of the interface
00424  * @param id ID of the interface
00425  */
00426 void
00427 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) throw()
00428 {
00429   __bbio_mutex->lock();
00430   __bbio_events += 1;
00431   __bbio_mutex->unlock();
00432 
00433   BBioMapIterator lhmi;
00434   BBioListIterator i, l;
00435   if ( (lhmi = __bbio_destroyed.find(type)) != __bbio_destroyed.end() ) {
00436     BBioList &list = (*lhmi).second;
00437     for (i = list.begin(); i != list.end(); ++i) {
00438       BlackBoardInterfaceObserver *bbio = i->first;
00439       for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
00440         if (fnmatch(pi->c_str(), id, 0) == 0) {
00441           bbio->bb_interface_destroyed(type, id);
00442           break;
00443         }
00444       }
00445     }
00446   }
00447 
00448   __bbio_mutex->lock();
00449   __bbio_events -= 1;
00450   process_bbio_queue();
00451   __bbio_mutex->unlock();
00452 }
00453 
00454 
00455 void
00456 BlackBoardNotifier::process_bbio_queue()
00457 {
00458   if ( ! __bbio_queue.empty() ) {
00459     if (__bbio_events > 0 ) {
00460       __bbio_waitcond->wait();
00461     } else {
00462       while (! __bbio_queue.empty()) {
00463         BBioQueueEntry &e = __bbio_queue.front();
00464         if (e.first & BlackBoard::BBIO_FLAG_CREATED) { // register create
00465           add_observer(e.second, e.second->bbio_get_observed_create(), __bbio_created);
00466         } else if (e.first & BlackBoard::BBIO_FLAG_DESTROYED) { // register destroy
00467           add_observer(e.second, e.second->bbio_get_observed_destroy(), __bbio_destroyed);
00468         } else {       // unregister
00469           remove_observer(__bbio_created, e.second);
00470           remove_observer(__bbio_destroyed, e.second);
00471         }
00472         __bbio_queue.pop_front();
00473       }
00474       __bbio_waitcond->wake_all();
00475     }
00476   }
00477 }
00478 
00479 
00480 /** Notify that writer has been added.
00481  * @param interface the interface for which the event happened. It is not necessarily the
00482  * instance which caused the event, but it must have the same mem serial.
00483  * @param event_instance_serial the instance serial of the interface that caused the event
00484  * @see BlackBoardInterfaceListener::bb_interface_writer_added()
00485  */
00486 void
00487 BlackBoardNotifier::notify_of_writer_added(const Interface *interface,
00488                                            unsigned int event_instance_serial) throw()
00489 {
00490   __bbil_writer_mutex->lock();
00491   if ( (__bbil_writer_events > 0) && ! __bbil_writer_queue.empty() ) {
00492     __bbil_writer_waitcond->wait();
00493   }
00494   __bbil_writer_events += 1;
00495   __bbil_writer_mutex->unlock();
00496 
00497   BBilMapIterator lhmi;
00498   BBilListIterator i, l;
00499   const char *uid = interface->uid();
00500   if ( (lhmi = __bbil_writer.find(uid)) != __bbil_writer.end() ) {
00501     BBilList &list = (*lhmi).second;
00502     for (i = list.begin(); i != list.end(); ++i) {
00503       BlackBoardInterfaceListener *bbil = (*i);
00504       Interface *bbil_iface = bbil->bbil_writer_interface(uid);
00505       if (bbil_iface != NULL ) {
00506         bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
00507       } else {
00508         LibLogger::log_warn("BlackBoardNotifier",
00509                             "BBIL[%s] registered for writer events "
00510                             "(open) for '%s' but has no such interface",
00511                             bbil->bbil_name(), uid);
00512       }
00513     }
00514   }
00515 
00516   __bbil_writer_mutex->lock();
00517   __bbil_writer_events -= 1;
00518   process_writer_queue();
00519   __bbil_writer_mutex->unlock();
00520 }
00521 
00522 
00523 /** Notify that writer has been removed.
00524  * @param interface interface for which the writer has been removed
00525  * @param event_instance_serial instance serial of the interface that caused the event
00526  * @see BlackBoardInterfaceListener::bb_interface_writer_removed()
00527  */
00528 void
00529 BlackBoardNotifier::notify_of_writer_removed(const Interface *interface,
00530                                              unsigned int event_instance_serial) throw()
00531 {
00532   __bbil_writer_mutex->lock();
00533   if ( (__bbil_writer_events > 0) && ! __bbil_writer_queue.empty() ) {
00534     __bbil_writer_waitcond->wait();
00535   }
00536   __bbil_writer_events += 1;
00537   __bbil_writer_mutex->unlock();
00538 
00539   BBilMapIterator lhmi;
00540   BBilListIterator i, l;
00541   const char *uid = interface->uid();
00542   if ( (lhmi = __bbil_writer.find(uid)) != __bbil_writer.end() ) {
00543     BBilList &list = (*lhmi).second;
00544     for (i = list.begin(); i != list.end(); ++i) {
00545       BlackBoardInterfaceListener *bbil = (*i);
00546       Interface *bbil_iface = bbil->bbil_writer_interface(uid);
00547       if (bbil_iface != NULL) {
00548         if (bbil_iface->serial() != event_instance_serial) {
00549           bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
00550         }
00551       } else {
00552         LibLogger::log_warn("BlackBoardNotifier",
00553                             "BBIL[%s] registered for writer events "
00554                             "(close) for '%s' but has no such interface",
00555                             bbil->bbil_name(), uid);
00556       }
00557     }
00558   }
00559 
00560   __bbil_writer_mutex->lock();
00561   __bbil_writer_events -= 1;
00562   process_writer_queue();
00563   __bbil_writer_mutex->unlock();
00564 }
00565 
00566 void
00567 BlackBoardNotifier::process_writer_queue()
00568 {
00569   if ( ! __bbil_writer_queue.empty() ) {
00570     if (__bbil_writer_events > 0 ) {
00571       __bbil_writer_waitcond->wait();
00572     } else {
00573       while (! __bbil_writer_queue.empty()) {
00574         BBilQueueEntry &e = __bbil_writer_queue.front();
00575         if (e.first) { // register
00576           add_listener(e.second, e.second->bbil_writer_interfaces(), __bbil_writer);
00577         } else {       // unregister
00578           remove_listener(__bbil_writer, e.second);
00579         }
00580         __bbil_writer_queue.pop_front();
00581       }
00582       __bbil_writer_waitcond->wake_all();
00583     }
00584   }
00585 }
00586 
00587 
00588 /** Notify that reader has been added.
00589  * @param interface interface for which the reader has been added
00590  * @param event_instance_serial instance serial of the interface that caused the event
00591  * @see BlackBoardInterfaceListener::bb_interface_reader_added()
00592  */
00593 void
00594 BlackBoardNotifier::notify_of_reader_added(const Interface *interface,
00595                                            unsigned int event_instance_serial) throw()
00596 {
00597   __bbil_reader_mutex->lock();
00598   if ( (__bbil_reader_events > 0) && ! __bbil_reader_queue.empty() ) {
00599     __bbil_reader_waitcond->wait();
00600   }
00601   __bbil_reader_events += 1;
00602   __bbil_reader_mutex->unlock();
00603 
00604   BBilMapIterator lhmi;
00605   BBilListIterator i, l;
00606   const char *uid = interface->uid();
00607   if ( (lhmi = __bbil_reader.find(uid)) != __bbil_reader.end() ) {
00608     BBilList &list = (*lhmi).second;
00609     for (i = list.begin(); i != list.end(); ++i) {
00610       BlackBoardInterfaceListener *bbil = (*i);
00611       Interface *bbil_iface = bbil->bbil_reader_interface(uid);
00612       if (bbil_iface != NULL ) {
00613         bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
00614       } else {
00615         LibLogger::log_warn("BlackBoardNotifier",
00616                             "BBIL[%s] registered for reader events "
00617                             "(open) for '%s' but has no such interface",
00618                             bbil->bbil_name(), uid);
00619       }
00620     }
00621   }
00622 
00623   __bbil_reader_mutex->lock();
00624   __bbil_reader_events -= 1;
00625   process_reader_queue();
00626   __bbil_reader_mutex->unlock();
00627 }
00628 
00629 
00630 /** Notify that reader has been removed.
00631  * @param interface interface for which the reader has been removed
00632  * @param event_instance_serial instance serial of the interface that caused the event
00633  * @see BlackBoardInterfaceListener::bb_interface_reader_removed()
00634  */
00635 void
00636 BlackBoardNotifier::notify_of_reader_removed(const Interface *interface,
00637                                              unsigned int event_instance_serial) throw()
00638 {
00639   __bbil_reader_mutex->lock();
00640   if ( (__bbil_reader_events > 0) && ! __bbil_reader_queue.empty() ) {
00641     __bbil_reader_waitcond->wait();
00642   }
00643   __bbil_reader_events += 1;
00644   __bbil_reader_mutex->unlock();
00645 
00646   BBilMapIterator lhmi;
00647   BBilListIterator i, l;
00648   const char *uid = interface->uid();
00649   if ( (lhmi = __bbil_reader.find(uid)) != __bbil_reader.end() ) {
00650     BBilList &list = (*lhmi).second;
00651     for (i = list.begin(); i != list.end(); ++i) {
00652       BlackBoardInterfaceListener *bbil = (*i);
00653       Interface *bbil_iface = bbil->bbil_reader_interface(uid);
00654       if (bbil_iface != NULL) {
00655         if (bbil_iface->serial() != event_instance_serial) {
00656           bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
00657         }
00658       } else {
00659         LibLogger::log_warn("BlackBoardNotifier",
00660                             "BBIL[%s] registered for reader events "
00661                             "(close) for '%s' but has no such interface",
00662                             bbil->bbil_name(), uid);
00663       }
00664     }
00665   }
00666 
00667   __bbil_reader_mutex->lock();
00668   __bbil_reader_events -= 1;
00669   process_reader_queue();
00670   __bbil_reader_mutex->unlock();
00671 }
00672 
00673 
00674 void
00675 BlackBoardNotifier::process_reader_queue()
00676 {
00677   if ( ! __bbil_reader_queue.empty() ) {
00678     if (__bbil_reader_events > 0 ) {
00679       __bbil_reader_waitcond->wait();
00680     } else {
00681       while (! __bbil_reader_queue.empty()) {
00682         BBilQueueEntry &e = __bbil_reader_queue.front();
00683         if (e.first) { // register
00684           add_listener(e.second, e.second->bbil_reader_interfaces(), __bbil_reader);
00685         } else {       // unregister
00686           remove_listener(__bbil_reader, e.second);
00687         }
00688         __bbil_reader_queue.pop_front();
00689       }
00690       __bbil_reader_waitcond->wake_all();
00691     }
00692   }
00693 }
00694 
00695 
00696 /** Notify of data change.
00697  * Notify all subscribers of the given interface of a data change.
00698  * This also influences logging and sending data over the network so it is
00699  * mandatory to call this function! The interface base class write method does
00700  * that for you.
00701  * @param interface interface whose subscribers to notify
00702  * @see Interface::write()
00703  * @see BlackBoardInterfaceListener::bb_interface_data_changed()
00704  */
00705 void
00706 BlackBoardNotifier::notify_of_data_change(const Interface *interface)
00707 {
00708   __bbil_data_mutex->lock();
00709   if ( (__bbil_data_events > 0) && ! __bbil_data_queue.empty() ) {
00710     __bbil_data_waitcond->wait();
00711   }
00712   __bbil_data_events += 1;
00713   __bbil_data_mutex->unlock();
00714 
00715   BBilMapIterator lhmi;
00716   BBilListIterator i, l;
00717   const char *uid = interface->uid();
00718   if ( (lhmi = __bbil_data.find(uid)) != __bbil_data.end() ) {
00719     BBilList &list = (*lhmi).second;
00720     for (i = list.begin(); i != list.end(); ++i) {
00721       BlackBoardInterfaceListener *bbil = (*i);
00722       Interface *bbil_iface = bbil->bbil_data_interface(uid);
00723       if (bbil_iface != NULL) {
00724         bbil->bb_interface_data_changed(bbil_iface);
00725       } else {
00726         LibLogger::log_warn("BlackBoardNotifier",
00727                             "BBIL[%s] registered for data change events "
00728                             "for '%s' but has no such interface",
00729                             bbil->bbil_name(), uid);
00730       }
00731     }
00732   }
00733 
00734   __bbil_data_mutex->lock();
00735   __bbil_data_events -= 1;
00736   if ( ! __bbil_data_queue.empty() ) {
00737     if (__bbil_data_events > 0 ) {
00738       __bbil_data_waitcond->wait();
00739     } else {
00740       while (! __bbil_data_queue.empty()) {
00741         BBilQueueEntry &e = __bbil_data_queue.front();
00742         if (e.first) { // register
00743           add_listener(e.second, e.second->bbil_data_interfaces(), __bbil_data);
00744         } else {       // unregister
00745           remove_listener(__bbil_data, e.second);
00746         }
00747         __bbil_data_queue.pop_front();
00748       }
00749       __bbil_data_waitcond->wake_all();
00750     }
00751   }
00752   __bbil_data_mutex->unlock();
00753 }
00754 
00755 
00756 /** Notify of message received
00757  * Notify all subscribers of the given interface of an incoming message
00758  * This also influences logging and sending data over the network so it is
00759  * mandatory to call this function! The interface base class write method does
00760  * that for you.
00761  * @param interface interface whose subscribers to notify
00762  * @param message message which is being received
00763  * @return true if any of the listeners did return true, false if none returned
00764  * true at all.
00765  * @see BlackBoardInterfaceListener::bb_interface_message_received()
00766  */
00767 bool
00768 BlackBoardNotifier::notify_of_message_received(const Interface *interface, Message *message)
00769 {
00770   __bbil_messages_mutex->lock();
00771   if ( (__bbil_messages_events > 0) && ! __bbil_messages_queue.empty() ) {
00772     __bbil_messages_waitcond->wait();
00773   }
00774   __bbil_messages_events += 1;
00775   __bbil_messages_mutex->unlock();
00776 
00777   bool rv = false;
00778 
00779   const char *uid = interface->uid();
00780   if ( __bbil_messages.find(uid) != __bbil_messages.end() ) {
00781     BlackBoardInterfaceListener *bbil = __bbil_messages[uid];
00782 
00783     Interface *bbil_iface = bbil->bbil_message_interface(uid);
00784     if (bbil_iface != NULL ) {
00785       if ( bbil->bb_interface_message_received(bbil_iface, message) ) {
00786           rv = true;
00787       }
00788     } else {
00789       LibLogger::log_warn("BlackBoardNotifier", "BBIL[%s] registered "
00790                           "for message received events for '%s' "
00791                           "but has no such interface",
00792                           bbil->bbil_name(), uid);
00793     }
00794   } else {
00795     rv = true;
00796   }
00797 
00798   __bbil_messages_mutex->lock();
00799   __bbil_messages_events -= 1;
00800   if ( ! __bbil_messages_queue.empty() ) {
00801     if (__bbil_messages_events > 0 ) {
00802       __bbil_messages_waitcond->wait();
00803     } else {
00804       while (! __bbil_messages_queue.empty()) {
00805         BBilQueueEntry &e = __bbil_messages_queue.front();
00806         // register never queues for message event listeners, only unregister does
00807         remove_message_listener_map(e.second);
00808         __bbil_messages_queue.pop_front();
00809       }
00810       __bbil_messages_waitcond->wake_all();
00811     }
00812   }
00813   __bbil_messages_mutex->unlock();
00814 
00815   return rv;
00816 }
00817 
00818 } // end namespace fawkes
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends