Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * remote.h - Remote BlackBoard access via Fawkes network protocol 00004 * 00005 * Created: Mon Mar 03 10:53:00 2008 00006 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <blackboard/remote.h> 00025 #include <blackboard/exceptions.h> 00026 #include <blackboard/net/messages.h> 00027 #include <blackboard/net/ilist_content.h> 00028 #include <blackboard/net/interface_proxy.h> 00029 #include <blackboard/internal/notifier.h> 00030 #include <blackboard/internal/instance_factory.h> 00031 00032 #include <interface/interface_info.h> 00033 00034 #include <core/threading/mutex.h> 00035 #include <core/threading/mutex_locker.h> 00036 #include <core/threading/wait_condition.h> 00037 #include <netcomm/fawkes/client.h> 00038 00039 #include <string> 00040 #include <cstring> 00041 #include <fnmatch.h> 00042 #include <arpa/inet.h> 00043 00044 namespace fawkes { 00045 00046 /** @class RemoteBlackBoard <blackboard/remote.h> 00047 * Remote BlackBoard. 00048 * This class implements the access to a remote BlackBoard using the Fawkes 00049 * network protocol. 00050 * 00051 * @author Tim Niemueller 00052 */ 00053 00054 /** Constructor. 00055 * @param client Fawkes network client to use. 00056 */ 00057 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client) 00058 { 00059 __fnc = client; 00060 __fnc_owner = false; 00061 00062 if ( ! __fnc->connected() ) { 00063 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client"); 00064 } 00065 00066 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD); 00067 00068 __mutex = new Mutex(); 00069 __notifier = new BlackBoardNotifier(); 00070 __instance_factory = new BlackBoardInstanceFactory(); 00071 00072 __wait_mutex = new Mutex(); 00073 __wait_cond = new WaitCondition(__wait_mutex); 00074 00075 __m = NULL; 00076 } 00077 00078 00079 /** Constructor. 00080 * This will internall create a fawkes network client that is used to communicate 00081 * with the remote BlackBoard. 00082 * @param hostname hostname to connect to 00083 * @param port port to connect to 00084 */ 00085 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port) 00086 { 00087 __fnc = new FawkesNetworkClient(hostname, port); 00088 try { 00089 __fnc->connect(); 00090 } catch (Exception &e) { 00091 delete __fnc; 00092 throw; 00093 } 00094 00095 __fnc_owner = true; 00096 00097 if ( ! __fnc->connected() ) { 00098 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client"); 00099 } 00100 00101 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD); 00102 00103 __mutex = new Mutex(); 00104 __notifier = new BlackBoardNotifier(); 00105 __instance_factory = new BlackBoardInstanceFactory(); 00106 00107 __wait_mutex = new Mutex(); 00108 __wait_cond = new WaitCondition(__wait_mutex); 00109 00110 __m = NULL; 00111 } 00112 00113 00114 /** Destructor. */ 00115 RemoteBlackBoard::~RemoteBlackBoard() 00116 { 00117 __fnc->deregister_handler(FAWKES_CID_BLACKBOARD); 00118 delete __mutex; 00119 delete __notifier; 00120 delete __instance_factory; 00121 00122 for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) { 00123 delete __pit->second; 00124 } 00125 00126 if (__fnc_owner) { 00127 __fnc->disconnect(); 00128 delete __fnc; 00129 } 00130 00131 delete __wait_cond; 00132 delete __wait_mutex; 00133 } 00134 00135 00136 bool 00137 RemoteBlackBoard::is_alive() const throw() 00138 { 00139 return __fnc->connected(); 00140 } 00141 00142 00143 void 00144 RemoteBlackBoard::reopen_interfaces() 00145 { 00146 __proxies.lock(); 00147 __ipit = __invalid_proxies.begin(); 00148 while ( __ipit != __invalid_proxies.end() ) { 00149 try { 00150 Interface *iface = (*__ipit)->interface(); 00151 open_interface(iface->type(), iface->id(), iface->is_writer(), iface); 00152 iface->set_validity(true); 00153 __ipit = __invalid_proxies.erase(__ipit); 00154 } catch (Exception &e) { 00155 // we failed to re-establish validity for the given interface, bad luck 00156 ++__ipit; 00157 } 00158 } 00159 __proxies.unlock(); 00160 } 00161 00162 bool 00163 RemoteBlackBoard::try_aliveness_restore() throw() 00164 { 00165 bool rv = true; 00166 try { 00167 if ( ! __fnc->connected() ) { 00168 __fnc->connect(); 00169 00170 reopen_interfaces(); 00171 } 00172 } catch (...) { 00173 rv = false; 00174 } 00175 return rv; 00176 } 00177 00178 00179 void 00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier, 00181 bool writer, Interface *iface) 00182 { 00183 if ( ! __fnc->connected() ) { 00184 throw Exception("Cannot instantiate remote interface, connection is dead"); 00185 } 00186 00187 bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t)); 00188 strncpy(om->type, type, __INTERFACE_TYPE_SIZE); 00189 strncpy(om->id, identifier, __INTERFACE_ID_SIZE); 00190 memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE); 00191 00192 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00193 writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING, 00194 om, sizeof(bb_iopen_msg_t)); 00195 00196 __wait_mutex->lock(); 00197 __fnc->enqueue(omsg); 00198 while (! __m || 00199 ((__m->msgid() != MSG_BB_OPEN_SUCCESS) && 00200 (__m->msgid() != MSG_BB_OPEN_FAILURE))) { 00201 if ( __m ) { 00202 __m->unref(); 00203 __m = NULL; 00204 } 00205 __wait_cond->wait(); 00206 } 00207 __wait_mutex->unlock(); 00208 00209 if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) { 00210 // We got the interface, create internal storage and prepare instance for return 00211 BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier, 00212 iface, writer); 00213 __proxies[proxy->serial()] = proxy; 00214 } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) { 00215 bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>(); 00216 unsigned int error = ntohl(fm->errno); 00217 __m->unref(); 00218 __m = NULL; 00219 if ( error == BB_ERR_WRITER_EXISTS ) { 00220 throw BlackBoardWriterActiveException(identifier, type); 00221 } else if ( error == BB_ERR_HASH_MISMATCH ) { 00222 throw Exception("Hash mismatch for interface %s:%s", type, identifier); 00223 } else if ( error == BB_ERR_UNKNOWN_TYPE ) { 00224 throw Exception("Type %s unknoen (%s:%s)", type, type, identifier); 00225 } else if ( error == BB_ERR_WRITER_EXISTS ) { 00226 throw BlackBoardWriterActiveException(identifier, type); 00227 } else { 00228 throw Exception("Could not open interface"); 00229 } 00230 } 00231 00232 __m->unref(); 00233 __m = NULL; 00234 } 00235 00236 Interface * 00237 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer) 00238 { 00239 if ( ! __fnc->connected() ) { 00240 throw Exception("Cannot instantiate remote interface, connection is dead"); 00241 } 00242 00243 Interface *iface = __instance_factory->new_interface_instance(type, identifier); 00244 try { 00245 open_interface(type, identifier, writer, iface); 00246 } catch (...) { 00247 __instance_factory->delete_interface_instance(iface); 00248 throw; 00249 } 00250 00251 return iface; 00252 } 00253 00254 00255 Interface * 00256 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier) 00257 { 00258 return open_interface(type, identifier, /* writer? */ false); 00259 } 00260 00261 00262 Interface * 00263 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier) 00264 { 00265 return open_interface(type, identifier, /* writer? */ true); 00266 } 00267 00268 00269 std::list<Interface *> 00270 RemoteBlackBoard::open_multiple_for_reading(const char *type, const char *id_pattern) 00271 { 00272 std::list<Interface *> rv; 00273 00274 InterfaceInfoList *infl = list_all(); 00275 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) { 00276 if ((strncmp(type, i->type(), __INTERFACE_TYPE_SIZE) != 0) || 00277 (fnmatch(id_pattern, i->id(), 0) == FNM_NOMATCH) ) { 00278 // type or ID prefix does not match, go on 00279 continue; 00280 } 00281 00282 try { 00283 Interface *iface = open_for_reading((*i).type(), (*i).id()); 00284 rv.push_back(iface); 00285 } catch (Exception &e) { 00286 for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) { 00287 close(*j); 00288 } 00289 throw; 00290 } 00291 } 00292 00293 return rv; 00294 } 00295 00296 00297 /** Close interface. 00298 * @param interface interface to close 00299 */ 00300 void 00301 RemoteBlackBoard::close(Interface *interface) 00302 { 00303 if ( interface == NULL ) return; 00304 00305 unsigned int serial = interface->serial(); 00306 00307 if ( __proxies.find(serial) != __proxies.end() ) { 00308 delete __proxies[serial]; 00309 __proxies.erase(serial); 00310 } 00311 00312 if ( __fnc->connected() ) { 00313 // We cannot "officially" close it, if we are disconnected it cannot be used anyway 00314 bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t)); 00315 sm->serial = htonl(interface->serial()); 00316 00317 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00318 MSG_BB_CLOSE, 00319 sm, sizeof(bb_iserial_msg_t)); 00320 __fnc->enqueue(omsg); 00321 } 00322 00323 __instance_factory->delete_interface_instance(interface); 00324 } 00325 00326 00327 void 00328 RemoteBlackBoard::register_listener(BlackBoardInterfaceListener *listener, unsigned int flags) 00329 { 00330 __notifier->register_listener(listener, flags); 00331 } 00332 00333 00334 void 00335 RemoteBlackBoard::unregister_listener(BlackBoardInterfaceListener *listener) 00336 { 00337 __notifier->unregister_listener(listener); 00338 } 00339 00340 00341 void 00342 RemoteBlackBoard::register_observer(BlackBoardInterfaceObserver *observer, unsigned int flags) 00343 { 00344 __notifier->register_observer(observer, flags); 00345 } 00346 00347 00348 void 00349 RemoteBlackBoard::unregister_observer(BlackBoardInterfaceObserver *observer) 00350 { 00351 __notifier->unregister_observer(observer); 00352 } 00353 00354 00355 InterfaceInfoList * 00356 RemoteBlackBoard::list_all() 00357 { 00358 MutexLocker lock(__mutex); 00359 InterfaceInfoList *infl = new InterfaceInfoList(); 00360 00361 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD, 00362 MSG_BB_LIST_ALL); 00363 __wait_mutex->lock(); 00364 __fnc->enqueue(omsg); 00365 while (! __m || 00366 (__m->msgid() != MSG_BB_INTERFACE_LIST)) { 00367 if ( __m ) { 00368 __m->unref(); 00369 __m = NULL; 00370 } 00371 __wait_cond->wait(); 00372 } 00373 __wait_mutex->unlock(); 00374 00375 BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>(); 00376 while ( bbilc->has_next() ) { 00377 size_t iisize; 00378 bb_iinfo_msg_t *ii = bbilc->next(&iisize); 00379 infl->append(ii->type, ii->id, ii->hash, ii->serial, 00380 ii->has_writer, ii->num_readers); 00381 } 00382 00383 __m->unref(); 00384 __m = NULL; 00385 00386 return infl; 00387 } 00388 00389 00390 /** We are no longer registered in Fawkes network client. 00391 * Ignored. 00392 * @param id the id of the calling client 00393 */ 00394 void 00395 RemoteBlackBoard::deregistered(unsigned int id) throw() 00396 { 00397 } 00398 00399 00400 void 00401 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m, 00402 unsigned int id) throw() 00403 { 00404 if ( m->cid() == FAWKES_CID_BLACKBOARD ) { 00405 unsigned int msgid = m->msgid(); 00406 try { 00407 if ( msgid == MSG_BB_DATA_CHANGED ) { 00408 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]); 00409 if ( __proxies.find(serial) != __proxies.end() ) { 00410 __proxies[serial]->process_data_changed(m); 00411 } 00412 } else if (msgid == MSG_BB_INTERFACE_MESSAGE) { 00413 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]); 00414 if ( __proxies.find(serial) != __proxies.end() ) { 00415 __proxies[serial]->process_interface_message(m); 00416 } 00417 } else if (msgid == MSG_BB_READER_ADDED) { 00418 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00419 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00420 __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial)); 00421 } 00422 } else if (msgid == MSG_BB_READER_REMOVED) { 00423 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00424 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00425 __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial)); 00426 } 00427 } else if (msgid == MSG_BB_WRITER_ADDED) { 00428 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00429 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00430 __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial)); 00431 } 00432 } else if (msgid == MSG_BB_WRITER_REMOVED) { 00433 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>(); 00434 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) { 00435 __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial)); 00436 } 00437 } else { 00438 __wait_mutex->lock(); 00439 __m = m; 00440 __m->ref(); 00441 __wait_cond->wake_all(); 00442 __wait_mutex->unlock(); 00443 } 00444 } catch (Exception &e) { 00445 // Bam, you're dead. Ok, not now, we just ignore that this shit happened... 00446 } 00447 } 00448 } 00449 00450 00451 void 00452 RemoteBlackBoard::connection_died(unsigned int id) throw() 00453 { 00454 // mark all assigned interfaces as invalid 00455 __proxies.lock(); 00456 for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) { 00457 __pit->second->interface()->set_validity(false); 00458 __invalid_proxies.push_back(__pit->second); 00459 } 00460 __proxies.clear(); 00461 __proxies.unlock(); 00462 } 00463 00464 00465 void 00466 RemoteBlackBoard::connection_established(unsigned int id) throw() 00467 { 00468 } 00469 00470 } // end namespace fawkes