Fawkes API  Fawkes Development Version
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
remote.cpp
1 
2 /***************************************************************************
3  * remote.h - Remote BlackBoard access via Fawkes network protocol
4  *
5  * Created: Mon Mar 03 10:53:00 2008
6  * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <blackboard/remote.h>
25 #include <blackboard/exceptions.h>
26 #include <blackboard/net/messages.h>
27 #include <blackboard/net/ilist_content.h>
28 #include <blackboard/net/interface_proxy.h>
29 #include <blackboard/internal/notifier.h>
30 #include <blackboard/internal/instance_factory.h>
31 
32 #include <interface/interface_info.h>
33 
34 #include <core/threading/thread.h>
35 #include <core/threading/mutex.h>
36 #include <core/threading/mutex_locker.h>
37 #include <core/threading/wait_condition.h>
38 #include <netcomm/fawkes/client.h>
39 
40 #include <string>
41 #include <cstring>
42 #include <fnmatch.h>
43 #include <arpa/inet.h>
44 
45 namespace fawkes {
46 
47 /** @class RemoteBlackBoard <blackboard/remote.h>
48  * Remote BlackBoard.
49  * This class implements the access to a remote BlackBoard using the Fawkes
50  * network protocol.
51  *
52  * @author Tim Niemueller
53  */
54 
55 /** Constructor.
56  * @param client Fawkes network client to use.
57  */
59 {
60  __fnc = client;
61  __fnc_owner = false;
62 
63  if ( ! __fnc->connected() ) {
64  throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
65  }
66 
67  __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
68 
69  __mutex = new Mutex();
70  __instance_factory = new BlackBoardInstanceFactory();
71 
72  __wait_mutex = new Mutex();
73  __wait_cond = new WaitCondition(__wait_mutex);
74 
75  __inbound_thread = NULL;
76  __m = NULL;
77 }
78 
79 
80 /** Constructor.
81  * This will internall create a fawkes network client that is used to communicate
82  * with the remote BlackBoard.
83  * @param hostname hostname to connect to
84  * @param port port to connect to
85  */
86 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
87 {
88  __fnc = new FawkesNetworkClient(hostname, port);
89  try {
90  __fnc->connect();
91  } catch (Exception &e) {
92  delete __fnc;
93  throw;
94  }
95 
96  __fnc_owner = true;
97 
98  if ( ! __fnc->connected() ) {
99  throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
100  }
101 
102  __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
103 
104  __mutex = new Mutex();
105  __instance_factory = new BlackBoardInstanceFactory();
106 
107  __wait_mutex = new Mutex();
108  __wait_cond = new WaitCondition(__wait_mutex);
109 
110  __inbound_thread = NULL;
111  __m = NULL;
112 }
113 
114 
115 /** Destructor. */
117 {
118  __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
119  delete __mutex;
120  delete __instance_factory;
121 
122  for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
123  delete __pit->second;
124  }
125 
126  if (__fnc_owner) {
127  __fnc->disconnect();
128  delete __fnc;
129  }
130 
131  delete __wait_cond;
132  delete __wait_mutex;
133 }
134 
135 
136 bool
138 {
139  return __fnc->connected();
140 }
141 
142 
143 void
144 RemoteBlackBoard::reopen_interfaces()
145 {
146  __proxies.lock();
147  __ipit = __invalid_proxies.begin();
148  while ( __ipit != __invalid_proxies.end() ) {
149  try {
150  Interface *iface = (*__ipit)->interface();
151  open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
152  iface->set_validity(true);
153  __ipit = __invalid_proxies.erase(__ipit);
154  } catch (Exception &e) {
155  // we failed to re-establish validity for the given interface, bad luck
156  ++__ipit;
157  }
158  }
159  __proxies.unlock();
160 }
161 
162 bool
164 {
165  bool rv = true;
166  try {
167  if ( ! __fnc->connected() ) {
168  __fnc->connect();
169 
170  reopen_interfaces();
171  }
172  } catch (...) {
173  rv = false;
174  }
175  return rv;
176 }
177 
178 
179 void
180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
181  bool writer, Interface *iface)
182 {
183  if ( ! __fnc->connected() ) {
184  throw Exception("Cannot instantiate remote interface, connection is dead");
185  }
186 
187  __mutex->lock();
188  if (__inbound_thread != NULL &&
190  strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
191  {
192  throw Exception("Cannot call open_interface() from inbound handler");
193  }
194  __mutex->unlock();
195 
196  bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
197  strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
198  strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
199  memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
200 
201  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
202  writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
203  om, sizeof(bb_iopen_msg_t));
204 
205  __wait_mutex->lock();
206  __fnc->enqueue(omsg);
207  while (is_alive() &&
208  (! __m ||
209  ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
210  (__m->msgid() != MSG_BB_OPEN_FAILURE))))
211  {
212  if ( __m ) {
213  __m->unref();
214  __m = NULL;
215  }
216  __wait_cond->wait();
217  }
218  __wait_mutex->unlock();
219 
220  if (!is_alive()) {
221  throw Exception("Connection died while trying to open %s::%s",
222  type, identifier);
223  }
224 
225  if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
226  // We got the interface, create internal storage and prepare instance for return
227  BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
228  iface, writer);
229  __proxies[proxy->serial()] = proxy;
230  } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
231  bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
232  unsigned int error = ntohl(fm->errno);
233  __m->unref();
234  __m = NULL;
235  if ( error == BB_ERR_WRITER_EXISTS ) {
236  throw BlackBoardWriterActiveException(identifier, type);
237  } else if ( error == BB_ERR_HASH_MISMATCH ) {
238  throw Exception("Hash mismatch for interface %s:%s", type, identifier);
239  } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
240  throw Exception("Type %s unknown (%s::%s)", type, type, identifier);
241  } else if ( error == BB_ERR_WRITER_EXISTS ) {
242  throw BlackBoardWriterActiveException(identifier, type);
243  } else {
244  throw Exception("Could not open interface");
245  }
246  }
247 
248  __m->unref();
249  __m = NULL;
250 }
251 
252 Interface *
253 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
254 {
255  if ( ! __fnc->connected() ) {
256  throw Exception("Cannot instantiate remote interface, connection is dead");
257  }
258 
259  Interface *iface = __instance_factory->new_interface_instance(type, identifier);
260  try {
261  open_interface(type, identifier, writer, iface);
262  } catch (Exception &e) {
263  __instance_factory->delete_interface_instance(iface);
264  throw;
265  }
266 
267  return iface;
268 }
269 
270 
271 Interface *
272 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
273 {
274  return open_interface(type, identifier, /* writer? */ false);
275 }
276 
277 
278 Interface *
279 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
280 {
281  return open_interface(type, identifier, /* writer? */ true);
282 }
283 
284 
285 std::list<Interface *>
287  const char *id_pattern)
288 {
289  std::list<Interface *> rv;
290 
291  InterfaceInfoList *infl = list_all();
292  for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
293  // ensure 0-termination
294  char type[__INTERFACE_TYPE_SIZE + 1];
295  char id[__INTERFACE_ID_SIZE + 1];
296  type[__INTERFACE_TYPE_SIZE] = 0;
297  id[__INTERFACE_TYPE_SIZE] = 0;
298  strncpy(type, i->type(), __INTERFACE_TYPE_SIZE);
299  strncpy(id, i->id(), __INTERFACE_ID_SIZE);
300 
301  if ((fnmatch(type_pattern, type, 0) == FNM_NOMATCH) ||
302  (fnmatch(id_pattern, id, 0) == FNM_NOMATCH) ) {
303  // type or ID prefix does not match, go on
304  continue;
305  }
306 
307  try {
308  Interface *iface = open_for_reading((*i).type(), (*i).id());
309  rv.push_back(iface);
310  } catch (Exception &e) {
311  for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
312  close(*j);
313  }
314  throw;
315  }
316  }
317 
318  return rv;
319 }
320 
321 
322 /** Close interface.
323  * @param interface interface to close
324  */
325 void
327 {
328  if ( interface == NULL ) return;
329 
330  unsigned int serial = interface->serial();
331 
332  if ( __proxies.find(serial) != __proxies.end() ) {
333  delete __proxies[serial];
334  __proxies.erase(serial);
335  }
336 
337  if ( __fnc->connected() ) {
338  // We cannot "officially" close it, if we are disconnected it cannot be used anyway
339  bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
340  sm->serial = htonl(interface->serial());
341 
342  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
343  MSG_BB_CLOSE,
344  sm, sizeof(bb_iserial_msg_t));
345  __fnc->enqueue(omsg);
346  }
347 
348  __instance_factory->delete_interface_instance(interface);
349 }
350 
351 
354 {
355  __mutex->lock();
356  if (__inbound_thread != NULL &&
357  strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
358  {
359  throw Exception("Cannot call list_all() from inbound handler");
360  }
361  __mutex->unlock();
362 
363  InterfaceInfoList *infl = new InterfaceInfoList();
364 
365  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
366  MSG_BB_LIST_ALL);
367  __wait_mutex->lock();
368  __fnc->enqueue(omsg);
369  while (! __m ||
370  (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
371  if ( __m ) {
372  __m->unref();
373  __m = NULL;
374  }
375  __wait_cond->wait();
376  }
377  __wait_mutex->unlock();
378 
380  while ( bbilc->has_next() ) {
381  size_t iisize;
382  bb_iinfo_msg_t *ii = bbilc->next(&iisize);
383  infl->append(ii->type, ii->id, ii->hash, ii->serial,
384  ii->has_writer, ii->num_readers);
385  }
386 
387  __m->unref();
388  __m = NULL;
389 
390  return infl;
391 }
392 
393 
395 RemoteBlackBoard::list(const char *type_pattern, const char *id_pattern)
396 {
397  __mutex->lock();
398  if (__inbound_thread != NULL &&
399  strcmp(Thread::current_thread()->name(), __inbound_thread) == 0)
400  {
401  throw Exception("Cannot call list() from inbound handler");
402  }
403  __mutex->unlock();
404 
405  InterfaceInfoList *infl = new InterfaceInfoList();
406 
407  bb_ilistreq_msg_t *om =
408  (bb_ilistreq_msg_t *)calloc(1, sizeof(bb_ilistreq_msg_t));
409  strncpy(om->type_pattern, type_pattern, __INTERFACE_TYPE_SIZE);
410  strncpy(om->id_pattern, id_pattern, __INTERFACE_ID_SIZE);
411 
412  FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
413  MSG_BB_LIST,
414  om,
415  sizeof(bb_ilistreq_msg_t));
416 
417  __wait_mutex->lock();
418  __fnc->enqueue(omsg);
419  while (! __m ||
420  (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
421  if ( __m ) {
422  __m->unref();
423  __m = NULL;
424  }
425  __wait_cond->wait();
426  }
427  __wait_mutex->unlock();
428 
431  while ( bbilc->has_next() ) {
432  size_t iisize;
433  bb_iinfo_msg_t *ii = bbilc->next(&iisize);
434  infl->append(ii->type, ii->id, ii->hash, ii->serial,
435  ii->has_writer, ii->num_readers);
436  }
437 
438  __m->unref();
439  __m = NULL;
440 
441  return infl;
442 }
443 
444 
445 /** We are no longer registered in Fawkes network client.
446  * Ignored.
447  * @param id the id of the calling client
448  */
449 void
450 RemoteBlackBoard::deregistered(unsigned int id) throw()
451 {
452 }
453 
454 
455 void
457  unsigned int id) throw()
458 {
459  __mutex->lock();
460  __inbound_thread = Thread::current_thread()->name();
461  __mutex->unlock();
462 
463  if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
464  unsigned int msgid = m->msgid();
465  try {
466  if ( msgid == MSG_BB_DATA_CHANGED ) {
467  unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
468  if ( __proxies.find(serial) != __proxies.end() ) {
469  __proxies[serial]->process_data_changed(m);
470  }
471  } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
472  unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
473  if ( __proxies.find(serial) != __proxies.end() ) {
474  __proxies[serial]->process_interface_message(m);
475  }
476  } else if (msgid == MSG_BB_READER_ADDED) {
478  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
479  __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
480  }
481  } else if (msgid == MSG_BB_READER_REMOVED) {
483  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
484  __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
485  }
486  } else if (msgid == MSG_BB_WRITER_ADDED) {
488  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
489  __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
490  }
491  } else if (msgid == MSG_BB_WRITER_REMOVED) {
493  if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
494  __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
495  }
496  } else if (msgid == MSG_BB_INTERFACE_CREATED) {
497  bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
498  __notifier->notify_of_interface_created(em->type, em->id);
499  } else if (msgid == MSG_BB_INTERFACE_DESTROYED) {
500  bb_ievent_msg_t *em = m->msg<bb_ievent_msg_t>();
501  __notifier->notify_of_interface_destroyed(em->type, em->id);
502  } else {
503  __wait_mutex->lock();
504  __m = m;
505  __m->ref();
506  __wait_cond->wake_all();
507  __wait_mutex->unlock();
508  }
509  } catch (Exception &e) {
510  // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
511  }
512  }
513 
514  __mutex->lock();
515  __inbound_thread = NULL;
516  __mutex->unlock();
517 }
518 
519 
520 void
521 RemoteBlackBoard::connection_died(unsigned int id) throw()
522 {
523  // mark all assigned interfaces as invalid
524  __proxies.lock();
525  for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
526  __pit->second->interface()->set_validity(false);
527  __invalid_proxies.push_back(__pit->second);
528  }
529  __proxies.clear();
530  __proxies.unlock();
531  __wait_cond->wake_all();
532 }
533 
534 
535 void
537 {
538 }
539 
540 } // end namespace fawkes
uint32_t serial
instance serial to unique identify this instance
Definition: messages.h:90
Interface * new_interface_instance(const char *type, const char *identifier)
Creates a new interface instance.
bool has_next()
Check if more list elements are available.
BlackBoard instance factory.
Wait until a given condition holds.
unsigned short serial() const
Get instance serial of interface.
Definition: interface.cpp:670
Requested interface type is unknown.
Definition: messages.h:59
Simple Fawkes network client.
Definition: client.h:51
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:99
char type[__INTERFACE_TYPE_SIZE]
interface type name
Definition: messages.h:102
uint32_t num_readers
number of currently existing readers
Definition: messages.h:94
Message to identify an two interface instances.
Definition: messages.h:120
void unlock()
Unlock the mutex.
Definition: mutex.cpp:135
virtual bool is_alive() const
Check if the BlackBoard is still alive.
Definition: remote.cpp:137
void disconnect()
Disconnect socket.
Definition: client.cpp:503
uint32_t has_writer
1 if the interface currently has a writer, 0 otherwise
Definition: messages.h:92
void register_handler(FawkesNetworkClientHandler *handler, unsigned int component_id)
Register handler.
Definition: client.cpp:620
void enqueue(FawkesNetworkMessage *message)
Enqueue message to send.
Definition: client.cpp:561
const unsigned char * hash() const
Get interface hash.
Definition: interface.cpp:292
Representation of a message that is sent over the network.
Definition: message.h:75
void connect()
Connect to remote.
Definition: client.cpp:415
bb_iinfo_msg_t * next(size_t *size)
Get next plugin from list.
unsigned short int msgid() const
Get message type ID.
Definition: message.cpp:301
virtual void deregistered(unsigned int id)
We are no longer registered in Fawkes network client.
Definition: remote.cpp:450
virtual void connection_established(unsigned int id)
Client has established a connection.
Definition: remote.cpp:536
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
Message for interface info.
Definition: messages.h:86
You tried to open an interface for writing but there is already a writing instance for this interface...
Definition: messages.h:62
uint32_t event_serial
instance serial to unique identify instance that caused the event.
Definition: messages.h:122
virtual InterfaceInfoList * list(const char *type_pattern, const char *id_pattern)
Get list of interfaces matching type and ID patterns.
Definition: remote.cpp:395
const char * id() const
Get identifier of interface.
Definition: interface.cpp:645
Interface information list.
char id[__INTERFACE_ID_SIZE]
interface instance ID
Definition: messages.h:88
virtual void close(Interface *interface)
Close interface.
Definition: remote.cpp:326
char id_pattern[__INTERFACE_ID_SIZE]
ID pattern.
Definition: messages.h:74
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual ~RemoteBlackBoard()
Destructor.
Definition: remote.cpp:116
uint32_t serial
instance serial to unique identify own instance
Definition: messages.h:121
void delete_interface_instance(Interface *interface)
Destroy an interface instance.
char type[__INTERFACE_TYPE_SIZE]
interface type name
Definition: messages.h:87
virtual bool try_aliveness_restore()
Try to restore the aliveness of the BlackBoard instance.
Definition: remote.cpp:163
virtual void inbound_received(FawkesNetworkMessage *msg, unsigned int id)
Called for incoming messages.
Definition: remote.cpp:456
unsigned char hash[__INTERFACE_HASH_SIZE]
interface version hash
Definition: messages.h:89
The hashes of the interfaces do not match.
Definition: messages.h:60
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1295
bool connected() const
Check if connection is alive.
Definition: client.cpp:797
BlackBoardNotifier * __notifier
Notifier for BB events.
Definition: blackboard.h:102
virtual void connection_died(unsigned int id)
Client connection died.
Definition: remote.cpp:521
void wait()
Wait for the condition forever.
const char * name() const
Get name of thread.
Definition: thread.h:95
MT * msgc() const
Get correctly parsed output.
Definition: message.h:154
virtual Interface * open_for_reading(const char *interface_type, const char *identifier)
Open interface for reading.
Definition: remote.cpp:272
MT * msg() const
Get correctly casted payload.
Definition: message.h:115
uint32_t serial
instance serial to unique identify this instance
Definition: messages.h:112
RemoteBlackBoard(FawkesNetworkClient *client)
Constructor.
Definition: remote.cpp:58
char type_pattern[__INTERFACE_TYPE_SIZE]
type pattern
Definition: messages.h:73
virtual InterfaceInfoList * list_all()
Get list of all currently existing interfaces.
Definition: remote.cpp:353
void append(const char *type, const char *id, const unsigned char *hash, unsigned int serial, bool has_writer, unsigned int num_readers)
Append an interface info.
void deregister_handler(unsigned int component_id)
Deregister handler.
Definition: client.cpp:639
Message for interface events.
Definition: messages.h:101
void set_validity(bool valid)
Mark this interface invalid.
Definition: interface.cpp:449
Message to request constrained interface list.
Definition: messages.h:72
BlackBoard interface list content.
Definition: ilist_content.h:35
Message to identify an interface instance.
Definition: messages.h:111
void lock()
Lock this mutex.
Definition: mutex.cpp:89
bool is_writer() const
Check if this is a writing instance.
Definition: interface.cpp:435
Mutex mutual exclusion lock.
Definition: mutex.h:32
std::list< Interface * > open_multiple_for_reading(const char *interface_type, const char *id_pattern="*")
Open multiple interfaces for reading.
Definition: remote.cpp:286
char id[__INTERFACE_ID_SIZE]
interface instance ID
Definition: messages.h:103
const char * type() const
Get type of interface.
Definition: interface.cpp:635
virtual Interface * open_for_writing(const char *interface_type, const char *identifier)
Open interface for writing.
Definition: remote.cpp:279