00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <blackboard/remote.h>
00025 #include <blackboard/exceptions.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/net/ilist_content.h>
00028 #include <blackboard/net/interface_proxy.h>
00029 #include <blackboard/internal/notifier.h>
00030 #include <blackboard/internal/instance_factory.h>
00031
00032 #include <interface/interface_info.h>
00033
00034 #include <core/threading/mutex.h>
00035 #include <core/threading/mutex_locker.h>
00036 #include <core/threading/wait_condition.h>
00037 #include <netcomm/fawkes/client.h>
00038
00039 #include <string>
00040 #include <cstring>
00041 #include <fnmatch.h>
00042 #include <arpa/inet.h>
00043
00044 namespace fawkes {
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client)
00058 {
00059 __fnc = client;
00060 __fnc_owner = false;
00061
00062 if ( ! __fnc->connected() ) {
00063 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00064 }
00065
00066 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00067
00068 __mutex = new Mutex();
00069 __notifier = new BlackBoardNotifier();
00070 __instance_factory = new BlackBoardInstanceFactory();
00071
00072 __wait_mutex = new Mutex();
00073 __wait_cond = new WaitCondition(__wait_mutex);
00074
00075 __m = NULL;
00076 }
00077
00078
00079
00080
00081
00082
00083
00084
00085 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
00086 {
00087 __fnc = new FawkesNetworkClient(hostname, port);
00088 try {
00089 __fnc->connect();
00090 } catch (Exception &e) {
00091 delete __fnc;
00092 throw;
00093 }
00094
00095 __fnc_owner = true;
00096
00097 if ( ! __fnc->connected() ) {
00098 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00099 }
00100
00101 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00102
00103 __mutex = new Mutex();
00104 __notifier = new BlackBoardNotifier();
00105 __instance_factory = new BlackBoardInstanceFactory();
00106
00107 __wait_mutex = new Mutex();
00108 __wait_cond = new WaitCondition(__wait_mutex);
00109
00110 __m = NULL;
00111 }
00112
00113
00114
00115 RemoteBlackBoard::~RemoteBlackBoard()
00116 {
00117 __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
00118 delete __mutex;
00119 delete __notifier;
00120 delete __instance_factory;
00121
00122 for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00123 delete __pit->second;
00124 }
00125
00126 if (__fnc_owner) {
00127 __fnc->disconnect();
00128 delete __fnc;
00129 }
00130
00131 delete __wait_cond;
00132 delete __wait_mutex;
00133 }
00134
00135
00136 bool
00137 RemoteBlackBoard::is_alive() const throw()
00138 {
00139 return __fnc->connected();
00140 }
00141
00142
00143 void
00144 RemoteBlackBoard::reopen_interfaces()
00145 {
00146 __proxies.lock();
00147 __ipit = __invalid_proxies.begin();
00148 while ( __ipit != __invalid_proxies.end() ) {
00149 try {
00150 Interface *iface = (*__ipit)->interface();
00151 open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
00152 iface->set_validity(true);
00153 __ipit = __invalid_proxies.erase(__ipit);
00154 } catch (Exception &e) {
00155
00156 ++__ipit;
00157 }
00158 }
00159 __proxies.unlock();
00160 }
00161
00162 bool
00163 RemoteBlackBoard::try_aliveness_restore() throw()
00164 {
00165 bool rv = true;
00166 try {
00167 if ( ! __fnc->connected() ) {
00168 __fnc->connect();
00169
00170 reopen_interfaces();
00171 }
00172 } catch (...) {
00173 rv = false;
00174 }
00175 return rv;
00176 }
00177
00178
00179 void
00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
00181 bool writer, Interface *iface)
00182 {
00183 if ( ! __fnc->connected() ) {
00184 throw Exception("Cannot instantiate remote interface, connection is dead");
00185 }
00186
00187 bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
00188 strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
00189 strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
00190 memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
00191
00192 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00193 writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
00194 om, sizeof(bb_iopen_msg_t));
00195
00196 __wait_mutex->lock();
00197 __fnc->enqueue(omsg);
00198 while (! __m ||
00199 ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
00200 (__m->msgid() != MSG_BB_OPEN_FAILURE))) {
00201 if ( __m ) {
00202 __m->unref();
00203 __m = NULL;
00204 }
00205 __wait_cond->wait();
00206 }
00207 __wait_mutex->unlock();
00208
00209 if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
00210
00211 BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
00212 iface, writer);
00213 __proxies[proxy->serial()] = proxy;
00214 } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
00215 bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
00216 unsigned int error = ntohl(fm->errno);
00217 __m->unref();
00218 __m = NULL;
00219 if ( error == BB_ERR_WRITER_EXISTS ) {
00220 throw BlackBoardWriterActiveException(identifier, type);
00221 } else if ( error == BB_ERR_HASH_MISMATCH ) {
00222 throw Exception("Hash mismatch for interface %s:%s", type, identifier);
00223 } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
00224 throw Exception("Type %s unknoen (%s:%s)", type, type, identifier);
00225 } else if ( error == BB_ERR_WRITER_EXISTS ) {
00226 throw BlackBoardWriterActiveException(identifier, type);
00227 } else {
00228 throw Exception("Could not open interface");
00229 }
00230 }
00231
00232 __m->unref();
00233 __m = NULL;
00234 }
00235
00236 Interface *
00237 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
00238 {
00239 if ( ! __fnc->connected() ) {
00240 throw Exception("Cannot instantiate remote interface, connection is dead");
00241 }
00242
00243 Interface *iface = __instance_factory->new_interface_instance(type, identifier);
00244 try {
00245 open_interface(type, identifier, writer, iface);
00246 } catch (...) {
00247 __instance_factory->delete_interface_instance(iface);
00248 throw;
00249 }
00250
00251 return iface;
00252 }
00253
00254
00255 Interface *
00256 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
00257 {
00258 return open_interface(type, identifier, false);
00259 }
00260
00261
00262 Interface *
00263 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
00264 {
00265 return open_interface(type, identifier, true);
00266 }
00267
00268
00269 std::list<Interface *>
00270 RemoteBlackBoard::open_multiple_for_reading(const char *type, const char *id_pattern)
00271 {
00272 std::list<Interface *> rv;
00273
00274 InterfaceInfoList *infl = list_all();
00275 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00276 if ((strncmp(type, i->type(), __INTERFACE_TYPE_SIZE) != 0) ||
00277 (fnmatch(id_pattern, i->id(), 0) == FNM_NOMATCH) ) {
00278
00279 continue;
00280 }
00281
00282 try {
00283 Interface *iface = open_for_reading((*i).type(), (*i).id());
00284 rv.push_back(iface);
00285 } catch (Exception &e) {
00286 for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
00287 close(*j);
00288 }
00289 throw;
00290 }
00291 }
00292
00293 return rv;
00294 }
00295
00296
00297
00298
00299
00300 void
00301 RemoteBlackBoard::close(Interface *interface)
00302 {
00303 if ( interface == NULL ) return;
00304
00305 unsigned int serial = interface->serial();
00306
00307 if ( __proxies.find(serial) != __proxies.end() ) {
00308 delete __proxies[serial];
00309 __proxies.erase(serial);
00310 }
00311
00312 if ( __fnc->connected() ) {
00313
00314 bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
00315 sm->serial = htonl(interface->serial());
00316
00317 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00318 MSG_BB_CLOSE,
00319 sm, sizeof(bb_iserial_msg_t));
00320 __fnc->enqueue(omsg);
00321 }
00322
00323 __instance_factory->delete_interface_instance(interface);
00324 }
00325
00326
00327 void
00328 RemoteBlackBoard::register_listener(BlackBoardInterfaceListener *listener, unsigned int flags)
00329 {
00330 __notifier->register_listener(listener, flags);
00331 }
00332
00333
00334 void
00335 RemoteBlackBoard::unregister_listener(BlackBoardInterfaceListener *listener)
00336 {
00337 __notifier->unregister_listener(listener);
00338 }
00339
00340
00341 void
00342 RemoteBlackBoard::register_observer(BlackBoardInterfaceObserver *observer, unsigned int flags)
00343 {
00344 __notifier->register_observer(observer, flags);
00345 }
00346
00347
00348 void
00349 RemoteBlackBoard::unregister_observer(BlackBoardInterfaceObserver *observer)
00350 {
00351 __notifier->unregister_observer(observer);
00352 }
00353
00354
00355 InterfaceInfoList *
00356 RemoteBlackBoard::list_all()
00357 {
00358 MutexLocker lock(__mutex);
00359 InterfaceInfoList *infl = new InterfaceInfoList();
00360
00361 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00362 MSG_BB_LIST_ALL);
00363 __wait_mutex->lock();
00364 __fnc->enqueue(omsg);
00365 while (! __m ||
00366 (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
00367 if ( __m ) {
00368 __m->unref();
00369 __m = NULL;
00370 }
00371 __wait_cond->wait();
00372 }
00373 __wait_mutex->unlock();
00374
00375 BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>();
00376 while ( bbilc->has_next() ) {
00377 size_t iisize;
00378 bb_iinfo_msg_t *ii = bbilc->next(&iisize);
00379 infl->append(ii->type, ii->id, ii->hash, ii->serial,
00380 ii->has_writer, ii->num_readers);
00381 }
00382
00383 __m->unref();
00384 __m = NULL;
00385
00386 return infl;
00387 }
00388
00389
00390
00391
00392
00393
00394 void
00395 RemoteBlackBoard::deregistered(unsigned int id) throw()
00396 {
00397 }
00398
00399
00400 void
00401 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m,
00402 unsigned int id) throw()
00403 {
00404 if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
00405 unsigned int msgid = m->msgid();
00406 try {
00407 if ( msgid == MSG_BB_DATA_CHANGED ) {
00408 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00409 if ( __proxies.find(serial) != __proxies.end() ) {
00410 __proxies[serial]->process_data_changed(m);
00411 }
00412 } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
00413 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00414 if ( __proxies.find(serial) != __proxies.end() ) {
00415 __proxies[serial]->process_interface_message(m);
00416 }
00417 } else if (msgid == MSG_BB_READER_ADDED) {
00418 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00419 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00420 __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
00421 }
00422 } else if (msgid == MSG_BB_READER_REMOVED) {
00423 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00424 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00425 __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
00426 }
00427 } else if (msgid == MSG_BB_WRITER_ADDED) {
00428 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00429 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00430 __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
00431 }
00432 } else if (msgid == MSG_BB_WRITER_REMOVED) {
00433 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00434 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00435 __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
00436 }
00437 } else {
00438 __wait_mutex->lock();
00439 __m = m;
00440 __m->ref();
00441 __wait_cond->wake_all();
00442 __wait_mutex->unlock();
00443 }
00444 } catch (Exception &e) {
00445
00446 }
00447 }
00448 }
00449
00450
00451 void
00452 RemoteBlackBoard::connection_died(unsigned int id) throw()
00453 {
00454
00455 __proxies.lock();
00456 for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00457 __pit->second->interface()->set_validity(false);
00458 __invalid_proxies.push_back(__pit->second);
00459 }
00460 __proxies.clear();
00461 __proxies.unlock();
00462 }
00463
00464
00465 void
00466 RemoteBlackBoard::connection_established(unsigned int id) throw()
00467 {
00468 }
00469
00470 }