Fawkes API Fawkes Development Version

sync_thread.cpp

00001 
00002 /***************************************************************************
00003  *  sync_thread.cpp - Fawkes BlackBoard Synchronization Thread
00004  *
00005  *  Created: Thu Jun 04 18:13:06 2009
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022 
00023 #include "sync_thread.h"
00024 
00025 #include <blackboard/remote.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <utils/time/wait.h>
00028 
00029 #include <cstring>
00030 
00031 using namespace std;
00032 using namespace fawkes;
00033 
00034 /** @class BlackBoardSynchronizationThread "sync_thread.h"
00035  * Thread to synchronize two BlackBoards.
00036  * @author Tim Niemueller
00037  */
00038 
00039 /** Constructor.
00040  * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin
00041  * @param peer_cfg_prefix The configuration prefix for the peer this sync thread
00042  * has been created for.
00043  * @param peer name of the peer configuration for this thread
00044  */
00045 BlackBoardSynchronizationThread::BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix,
00046                                                                  std::string &peer_cfg_prefix,
00047                                                                  std::string &peer)
00048   : Thread("", Thread::OPMODE_CONTINUOUS)
00049 {
00050   set_name("BBSyncThread[%s]", peer.c_str());
00051   set_prepfin_conc_loop(true);
00052 
00053   __bbsync_cfg_prefix = bbsync_cfg_prefix;
00054   __peer_cfg_prefix   = peer_cfg_prefix;
00055   __peer              = peer;
00056 
00057   __remote_bb = NULL;
00058 }
00059 
00060 
00061 /** Destructor. */
00062 BlackBoardSynchronizationThread::~BlackBoardSynchronizationThread()
00063 {
00064 }
00065 
00066 void
00067 BlackBoardSynchronizationThread::init()
00068 {
00069   logger->log_debug(name(), "Initializing");
00070   unsigned int check_interval = 0;
00071   try {
00072     __host = config->get_string((__peer_cfg_prefix + "host").c_str());
00073     __port = config->get_uint((__peer_cfg_prefix + "port").c_str());
00074 
00075     check_interval = config->get_uint((__bbsync_cfg_prefix + "check_interval").c_str());
00076   } catch (Exception &e) {
00077     e.append("Host or port not specified for peer");
00078     throw;
00079   }
00080 
00081   try {
00082     check_interval = config->get_uint((__peer_cfg_prefix + "check_interval").c_str());
00083     logger->log_debug(name(), "Peer check interval set, overriding default.");
00084   } catch (Exception &e) {
00085     logger->log_debug(name(), "No per-peer check interval set, using default");
00086   }
00087 
00088   read_config_combos(__peer_cfg_prefix + "reading/", /* writing */ false);
00089   read_config_combos(__peer_cfg_prefix + "writing/", /* writing */ true);
00090 
00091   for (ComboMap::iterator i = __combos.begin(); i != __combos.end(); ++i) {
00092     logger->log_debug(name(), "Combo: %s, %s (%s, R) -> %s (%s, W)", i->second.type.c_str(),
00093                       i->second.reader_id.c_str(), i->second.remote_writer ? "local" : "remote",
00094                       i->second.writer_id.c_str(), i->second.remote_writer ? "remote" : "local");
00095   }
00096 
00097   __wsl_local  = new SyncWriterInterfaceListener(this, logger, (__peer + "/local").c_str());
00098   __wsl_remote = new SyncWriterInterfaceListener(this, logger, (__peer + "/remote").c_str());
00099 
00100   if (! check_connection()) {
00101     logger->log_warn(name(), "Remote peer not reachable, will keep trying");
00102   }
00103 
00104   logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
00105   __timewait = new TimeWait(clock, check_interval * 1000);
00106 }
00107 
00108 
00109 void
00110 BlackBoardSynchronizationThread::finalize()
00111 {
00112 
00113   delete __timewait;
00114 
00115   close_interfaces();
00116 
00117   delete __wsl_local;
00118   delete __wsl_remote;
00119   delete __remote_bb;
00120   __remote_bb = NULL;
00121 }
00122 
00123 
00124 void
00125 BlackBoardSynchronizationThread::loop()
00126 {
00127   __timewait->mark_start();
00128   check_connection();
00129   __timewait->wait_systime();
00130 }
00131 
00132 
00133 bool
00134 BlackBoardSynchronizationThread::check_connection()
00135 {
00136   if (! __remote_bb || ! __remote_bb->is_alive()) {
00137     if (__remote_bb) {
00138       logger->log_warn(name(), "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
00139                        __peer.c_str(), __host.c_str(), __port);
00140       blackboard->unregister_listener(__wsl_local);
00141       __remote_bb->unregister_listener(__wsl_remote);
00142       close_interfaces();
00143       delete __remote_bb;
00144       __remote_bb = NULL;
00145     }
00146 
00147     try {
00148       __remote_bb = new RemoteBlackBoard(__host.c_str(), __port);
00149       logger->log_info(name(), "Successfully connected via remote BB to %s (%s:%u)",
00150                        __peer.c_str(), __host.c_str(), __port);
00151 
00152       open_interfaces();
00153       blackboard->register_listener(__wsl_local, BlackBoard::BBIL_FLAG_WRITER);
00154       __remote_bb->register_listener(__wsl_remote, BlackBoard::BBIL_FLAG_WRITER);
00155     } catch (Exception &e) {
00156       e.print_trace();
00157       return false;
00158     }
00159   }
00160   return true;
00161 }
00162 
00163 void
00164 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
00165 {
00166   Configuration::ValueIterator *i = config->search(prefix.c_str());
00167   while (i->next()) {
00168     if (strcmp(i->type(), "string") != 0) {
00169       TypeMismatchException e("Only values of type string may occur in %s, "
00170                               "but found value of type %s",
00171                               prefix.c_str(), i->type());
00172       delete i;
00173       throw e;
00174     }
00175 
00176     std::string varname = std::string(i->path()).substr(prefix.length());
00177     std::string uid     = i->get_string();
00178     size_t sf;
00179 
00180     if ((sf = uid.find("::")) == std::string::npos) {
00181       delete i;
00182       throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
00183                       uid.c_str(), i->path());
00184     }
00185 
00186     std::string type = uid.substr(0, sf);
00187     std::string id = uid.substr(sf + 2);
00188     combo_t combo = {  type, id, id, writing };
00189 
00190     if ( (sf = id.find("=")) != std::string::npos) {
00191       // we got a mapping
00192       if ( writing ) {
00193         combo.writer_id = id.substr(0, sf);
00194         combo.reader_id = id.substr(sf + 1);
00195       } else {
00196         combo.reader_id = id.substr(0, sf);
00197         combo.writer_id = id.substr(sf + 1);
00198       }
00199     }
00200 
00201     __combos[varname] = combo;
00202   }
00203   delete i;
00204 }
00205 
00206 
00207 void
00208 BlackBoardSynchronizationThread::open_interfaces()
00209 {
00210   logger->log_debug(name(), "Opening interfaces");
00211   MutexLocker lock(__interfaces.mutex());
00212 
00213   ComboMap::iterator i;
00214   for (i = __combos.begin(); i != __combos.end(); ++i) {
00215     Interface *iface_reader = NULL, *iface_writer = NULL;
00216 
00217     BlackBoard *writer_bb = i->second.remote_writer ? __remote_bb : blackboard;
00218     BlackBoard *reader_bb = i->second.remote_writer ? blackboard  : __remote_bb;
00219 
00220     try {
00221       logger->log_debug(name(), "Opening reading %s (%s:%s)",
00222                         i->second.remote_writer ? "locally" : "remotely",
00223                         i->second.type.c_str(), i->second.reader_id.c_str());
00224       iface_reader = reader_bb->open_for_reading(i->second.type.c_str(),
00225                                                  i->second.reader_id.c_str());
00226 
00227       if (iface_reader->has_writer()) {
00228         logger->log_debug(name(), "Opening writing on %s (%s:%s)",
00229                           i->second.remote_writer ? "remotely" : "locally",
00230                           i->second.type.c_str(), i->second.writer_id.c_str());
00231         iface_writer = writer_bb->open_for_writing(i->second.type.c_str(),
00232                                                    i->second.writer_id.c_str());
00233       }
00234 
00235       InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
00236       __interfaces[iface_reader] = ii;
00237 
00238     } catch (Exception &e) {
00239       reader_bb->close(iface_reader);
00240       writer_bb->close(iface_writer);
00241       throw;
00242     }
00243 
00244     SyncInterfaceListener *sync_listener = NULL;
00245     if (iface_writer) {
00246       logger->log_debug(name(), "Creating sync listener");
00247       sync_listener = new SyncInterfaceListener(logger, iface_reader, iface_writer,
00248                                                 reader_bb, writer_bb);
00249     }
00250     __sync_listeners[iface_reader] = sync_listener;
00251 
00252     if (i->second.remote_writer) {
00253       __wsl_local->add_interface(iface_reader);
00254     } else {
00255       __wsl_remote->add_interface(iface_reader);
00256     }
00257   }
00258 }
00259 
00260 
00261 void
00262 BlackBoardSynchronizationThread::close_interfaces()
00263 {
00264   SyncListenerMap::iterator s;
00265   for (s = __sync_listeners.begin(); s != __sync_listeners.end(); ++s) {
00266     if (s->second) {
00267       logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
00268       delete s->second;
00269     }
00270   }
00271   MutexLocker lock(__interfaces.mutex());
00272   InterfaceMap::iterator i;
00273   for (i = __interfaces.begin(); i != __interfaces.end(); ++i) {
00274     logger->log_debug(name(), "Closing %s reading interface %s",
00275                       i->second.combo->remote_writer ? "local" : "remote",
00276                       i->first->uid());
00277     if (i->second.combo->remote_writer) {
00278       __wsl_local->remove_interface(i->first);
00279       blackboard->close(i->first);
00280     } else {
00281       __wsl_remote->remove_interface(i->first);
00282       __remote_bb->close(i->first);
00283     }
00284     if (i->second.writer) {
00285       logger->log_debug(name(), "Closing %s writing interface %s",
00286                         i->second.combo->remote_writer ? "remote" : "local",
00287                         i->second.writer->uid());
00288       if (i->second.combo->remote_writer) {
00289         __remote_bb->close(i->second.writer);
00290       } else {
00291         blackboard->close(i->second.writer);
00292       }
00293     }
00294   }
00295   __interfaces.clear();
00296   __sync_listeners.clear();
00297 }
00298 
00299 
00300 /** A writer has been added for an interface.
00301  * To be called only by SyncWriterInterfaceListener.
00302  * @param interface the interface a writer has been added for.
00303  */
00304 void
00305 BlackBoardSynchronizationThread::writer_added(fawkes::Interface *interface) throw()
00306 {
00307   MutexLocker lock(__interfaces.mutex());
00308 
00309   if (__interfaces[interface].writer) {
00310     // There exists a writer!?
00311     logger->log_warn(name(), "Writer added for %s, but relay exists already. Bug?", interface->uid());
00312   } else {
00313     logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
00314 
00315     Interface *iface = NULL;
00316     SyncInterfaceListener *sync_listener = NULL;
00317     InterfaceInfo &ii = __interfaces[interface];
00318     try {
00319       iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(),
00320                                              ii.combo->writer_id.c_str());
00321       
00322       logger->log_debug(name(), "Creating sync listener for %s:%s-%s",
00323                         ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00324                         ii.combo->writer_id.c_str());
00325 
00326       sync_listener = new SyncInterfaceListener(logger, interface, iface,
00327                                                 ii.reader_bb, ii.writer_bb);
00328 
00329       __sync_listeners[interface] = sync_listener;
00330       ii.writer = iface;
00331 
00332     } catch (Exception &e) {
00333       delete sync_listener;
00334       ii.writer_bb->close(iface);
00335       logger->log_error(name(), "Failed to open writer for %s:%s-%s, sync broken",
00336                         ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00337                         ii.combo->writer_id.c_str());
00338       logger->log_error(name(), e);
00339     }
00340   }
00341 }
00342 
00343 
00344 /** A writer has been removed for an interface.
00345  * To be called only by SyncWriterInterfaceListener.
00346  * @param interface the interface a writer has been removed for.
00347  */
00348 void
00349 BlackBoardSynchronizationThread::writer_removed(fawkes::Interface *interface) throw()
00350 {
00351   MutexLocker lock(__interfaces.mutex());
00352 
00353   if (! __interfaces[interface].writer) {
00354     // We do not have a writer!?
00355     logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
00356   } else {
00357     logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
00358 
00359     InterfaceInfo &ii = __interfaces[interface];
00360     try {
00361       delete __sync_listeners[interface];
00362       __sync_listeners[interface] = NULL;
00363 
00364       ii.writer_bb->close(ii.writer);
00365       ii.writer = NULL;
00366 
00367     } catch (Exception &e) {
00368       logger->log_error(name(), "Failed to close writer for %s:%s-%s, sync broken",
00369                         ii.combo->type.c_str(), ii.combo->reader_id.c_str(),
00370                         ii.combo->writer_id.c_str());
00371       logger->log_error(name(), e);
00372     }
00373   }
00374 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends