Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * sync_thread.cpp - Fawkes BlackBoard Synchronization Thread 00004 * 00005 * Created: Thu Jun 04 18:13:06 2009 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. 00014 * 00015 * This program is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 * GNU Library General Public License for more details. 00019 * 00020 * Read the full text in the LICENSE.GPL file in the doc directory. 00021 */ 00022 00023 #include "sync_thread.h" 00024 00025 #include <blackboard/remote.h> 00026 #include <core/threading/mutex_locker.h> 00027 #include <utils/time/wait.h> 00028 00029 #include <cstring> 00030 00031 using namespace std; 00032 using namespace fawkes; 00033 00034 /** @class BlackBoardSynchronizationThread "sync_thread.h" 00035 * Thread to synchronize two BlackBoards. 00036 * @author Tim Niemueller 00037 */ 00038 00039 /** Constructor. 00040 * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin 00041 * @param peer_cfg_prefix The configuration prefix for the peer this sync thread 00042 * has been created for. 00043 * @param peer name of the peer configuration for this thread 00044 */ 00045 BlackBoardSynchronizationThread::BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, 00046 std::string &peer_cfg_prefix, 00047 std::string &peer) 00048 : Thread("", Thread::OPMODE_CONTINUOUS) 00049 { 00050 set_name("BBSyncThread[%s]", peer.c_str()); 00051 set_prepfin_conc_loop(true); 00052 00053 __bbsync_cfg_prefix = bbsync_cfg_prefix; 00054 __peer_cfg_prefix = peer_cfg_prefix; 00055 __peer = peer; 00056 00057 __remote_bb = NULL; 00058 } 00059 00060 00061 /** Destructor. */ 00062 BlackBoardSynchronizationThread::~BlackBoardSynchronizationThread() 00063 { 00064 } 00065 00066 void 00067 BlackBoardSynchronizationThread::init() 00068 { 00069 logger->log_debug(name(), "Initializing"); 00070 unsigned int check_interval = 0; 00071 try { 00072 __host = config->get_string((__peer_cfg_prefix + "host").c_str()); 00073 __port = config->get_uint((__peer_cfg_prefix + "port").c_str()); 00074 00075 check_interval = config->get_uint((__bbsync_cfg_prefix + "check_interval").c_str()); 00076 } catch (Exception &e) { 00077 e.append("Host or port not specified for peer"); 00078 throw; 00079 } 00080 00081 try { 00082 check_interval = config->get_uint((__peer_cfg_prefix + "check_interval").c_str()); 00083 logger->log_debug(name(), "Peer check interval set, overriding default."); 00084 } catch (Exception &e) { 00085 logger->log_debug(name(), "No per-peer check interval set, using default"); 00086 } 00087 00088 read_config_combos(__peer_cfg_prefix + "reading/", /* writing */ false); 00089 read_config_combos(__peer_cfg_prefix + "writing/", /* writing */ true); 00090 00091 for (ComboMap::iterator i = __combos.begin(); i != __combos.end(); ++i) { 00092 logger->log_debug(name(), "Combo: %s, %s (%s, R) -> %s (%s, W)", i->second.type.c_str(), 00093 i->second.reader_id.c_str(), i->second.remote_writer ? "local" : "remote", 00094 i->second.writer_id.c_str(), i->second.remote_writer ? "remote" : "local"); 00095 } 00096 00097 __wsl_local = new SyncWriterInterfaceListener(this, logger, (__peer + "/local").c_str()); 00098 __wsl_remote = new SyncWriterInterfaceListener(this, logger, (__peer + "/remote").c_str()); 00099 00100 if (! check_connection()) { 00101 logger->log_warn(name(), "Remote peer not reachable, will keep trying"); 00102 } 00103 00104 logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval); 00105 __timewait = new TimeWait(clock, check_interval * 1000); 00106 } 00107 00108 00109 void 00110 BlackBoardSynchronizationThread::finalize() 00111 { 00112 00113 delete __timewait; 00114 00115 close_interfaces(); 00116 00117 delete __wsl_local; 00118 delete __wsl_remote; 00119 delete __remote_bb; 00120 __remote_bb = NULL; 00121 } 00122 00123 00124 void 00125 BlackBoardSynchronizationThread::loop() 00126 { 00127 __timewait->mark_start(); 00128 check_connection(); 00129 __timewait->wait_systime(); 00130 } 00131 00132 00133 bool 00134 BlackBoardSynchronizationThread::check_connection() 00135 { 00136 if (! __remote_bb || ! __remote_bb->is_alive()) { 00137 if (__remote_bb) { 00138 logger->log_warn(name(), "Lost connection via remote BB to %s (%s:%u), will try to re-establish", 00139 __peer.c_str(), __host.c_str(), __port); 00140 blackboard->unregister_listener(__wsl_local); 00141 __remote_bb->unregister_listener(__wsl_remote); 00142 close_interfaces(); 00143 delete __remote_bb; 00144 __remote_bb = NULL; 00145 } 00146 00147 try { 00148 __remote_bb = new RemoteBlackBoard(__host.c_str(), __port); 00149 logger->log_info(name(), "Successfully connected via remote BB to %s (%s:%u)", 00150 __peer.c_str(), __host.c_str(), __port); 00151 00152 open_interfaces(); 00153 blackboard->register_listener(__wsl_local, BlackBoard::BBIL_FLAG_WRITER); 00154 __remote_bb->register_listener(__wsl_remote, BlackBoard::BBIL_FLAG_WRITER); 00155 } catch (Exception &e) { 00156 e.print_trace(); 00157 return false; 00158 } 00159 } 00160 return true; 00161 } 00162 00163 void 00164 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing) 00165 { 00166 Configuration::ValueIterator *i = config->search(prefix.c_str()); 00167 while (i->next()) { 00168 if (strcmp(i->type(), "string") != 0) { 00169 TypeMismatchException e("Only values of type string may occur in %s, " 00170 "but found value of type %s", 00171 prefix.c_str(), i->type()); 00172 delete i; 00173 throw e; 00174 } 00175 00176 std::string varname = std::string(i->path()).substr(prefix.length()); 00177 std::string uid = i->get_string(); 00178 size_t sf; 00179 00180 if ((sf = uid.find("::")) == std::string::npos) { 00181 delete i; 00182 throw Exception("Interface UID '%s' at %s is not valid, missing double colon", 00183 uid.c_str(), i->path()); 00184 } 00185 00186 std::string type = uid.substr(0, sf); 00187 std::string id = uid.substr(sf + 2); 00188 combo_t combo = { type, id, id, writing }; 00189 00190 if ( (sf = id.find("=")) != std::string::npos) { 00191 // we got a mapping 00192 if ( writing ) { 00193 combo.writer_id = id.substr(0, sf); 00194 combo.reader_id = id.substr(sf + 1); 00195 } else { 00196 combo.reader_id = id.substr(0, sf); 00197 combo.writer_id = id.substr(sf + 1); 00198 } 00199 } 00200 00201 __combos[varname] = combo; 00202 } 00203 delete i; 00204 } 00205 00206 00207 void 00208 BlackBoardSynchronizationThread::open_interfaces() 00209 { 00210 logger->log_debug(name(), "Opening interfaces"); 00211 MutexLocker lock(__interfaces.mutex()); 00212 00213 ComboMap::iterator i; 00214 for (i = __combos.begin(); i != __combos.end(); ++i) { 00215 Interface *iface_reader = NULL, *iface_writer = NULL; 00216 00217 BlackBoard *writer_bb = i->second.remote_writer ? __remote_bb : blackboard; 00218 BlackBoard *reader_bb = i->second.remote_writer ? blackboard : __remote_bb; 00219 00220 try { 00221 logger->log_debug(name(), "Opening reading %s (%s:%s)", 00222 i->second.remote_writer ? "locally" : "remotely", 00223 i->second.type.c_str(), i->second.reader_id.c_str()); 00224 iface_reader = reader_bb->open_for_reading(i->second.type.c_str(), 00225 i->second.reader_id.c_str()); 00226 00227 if (iface_reader->has_writer()) { 00228 logger->log_debug(name(), "Opening writing on %s (%s:%s)", 00229 i->second.remote_writer ? "remotely" : "locally", 00230 i->second.type.c_str(), i->second.writer_id.c_str()); 00231 iface_writer = writer_bb->open_for_writing(i->second.type.c_str(), 00232 i->second.writer_id.c_str()); 00233 } 00234 00235 InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb); 00236 __interfaces[iface_reader] = ii; 00237 00238 } catch (Exception &e) { 00239 reader_bb->close(iface_reader); 00240 writer_bb->close(iface_writer); 00241 throw; 00242 } 00243 00244 SyncInterfaceListener *sync_listener = NULL; 00245 if (iface_writer) { 00246 logger->log_debug(name(), "Creating sync listener"); 00247 sync_listener = new SyncInterfaceListener(logger, iface_reader, iface_writer, 00248 reader_bb, writer_bb); 00249 } 00250 __sync_listeners[iface_reader] = sync_listener; 00251 00252 if (i->second.remote_writer) { 00253 __wsl_local->add_interface(iface_reader); 00254 } else { 00255 __wsl_remote->add_interface(iface_reader); 00256 } 00257 } 00258 } 00259 00260 00261 void 00262 BlackBoardSynchronizationThread::close_interfaces() 00263 { 00264 SyncListenerMap::iterator s; 00265 for (s = __sync_listeners.begin(); s != __sync_listeners.end(); ++s) { 00266 if (s->second) { 00267 logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name()); 00268 delete s->second; 00269 } 00270 } 00271 MutexLocker lock(__interfaces.mutex()); 00272 InterfaceMap::iterator i; 00273 for (i = __interfaces.begin(); i != __interfaces.end(); ++i) { 00274 logger->log_debug(name(), "Closing %s reading interface %s", 00275 i->second.combo->remote_writer ? "local" : "remote", 00276 i->first->uid()); 00277 if (i->second.combo->remote_writer) { 00278 __wsl_local->remove_interface(i->first); 00279 blackboard->close(i->first); 00280 } else { 00281 __wsl_remote->remove_interface(i->first); 00282 __remote_bb->close(i->first); 00283 } 00284 if (i->second.writer) { 00285 logger->log_debug(name(), "Closing %s writing interface %s", 00286 i->second.combo->remote_writer ? "remote" : "local", 00287 i->second.writer->uid()); 00288 if (i->second.combo->remote_writer) { 00289 __remote_bb->close(i->second.writer); 00290 } else { 00291 blackboard->close(i->second.writer); 00292 } 00293 } 00294 } 00295 __interfaces.clear(); 00296 __sync_listeners.clear(); 00297 } 00298 00299 00300 /** A writer has been added for an interface. 00301 * To be called only by SyncWriterInterfaceListener. 00302 * @param interface the interface a writer has been added for. 00303 */ 00304 void 00305 BlackBoardSynchronizationThread::writer_added(fawkes::Interface *interface) throw() 00306 { 00307 MutexLocker lock(__interfaces.mutex()); 00308 00309 if (__interfaces[interface].writer) { 00310 // There exists a writer!? 00311 logger->log_warn(name(), "Writer added for %s, but relay exists already. Bug?", interface->uid()); 00312 } else { 00313 logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid()); 00314 00315 Interface *iface = NULL; 00316 SyncInterfaceListener *sync_listener = NULL; 00317 InterfaceInfo &ii = __interfaces[interface]; 00318 try { 00319 iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(), 00320 ii.combo->writer_id.c_str()); 00321 00322 logger->log_debug(name(), "Creating sync listener for %s:%s-%s", 00323 ii.combo->type.c_str(), ii.combo->reader_id.c_str(), 00324 ii.combo->writer_id.c_str()); 00325 00326 sync_listener = new SyncInterfaceListener(logger, interface, iface, 00327 ii.reader_bb, ii.writer_bb); 00328 00329 __sync_listeners[interface] = sync_listener; 00330 ii.writer = iface; 00331 00332 } catch (Exception &e) { 00333 delete sync_listener; 00334 ii.writer_bb->close(iface); 00335 logger->log_error(name(), "Failed to open writer for %s:%s-%s, sync broken", 00336 ii.combo->type.c_str(), ii.combo->reader_id.c_str(), 00337 ii.combo->writer_id.c_str()); 00338 logger->log_error(name(), e); 00339 } 00340 } 00341 } 00342 00343 00344 /** A writer has been removed for an interface. 00345 * To be called only by SyncWriterInterfaceListener. 00346 * @param interface the interface a writer has been removed for. 00347 */ 00348 void 00349 BlackBoardSynchronizationThread::writer_removed(fawkes::Interface *interface) throw() 00350 { 00351 MutexLocker lock(__interfaces.mutex()); 00352 00353 if (! __interfaces[interface].writer) { 00354 // We do not have a writer!? 00355 logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid()); 00356 } else { 00357 logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid()); 00358 00359 InterfaceInfo &ii = __interfaces[interface]; 00360 try { 00361 delete __sync_listeners[interface]; 00362 __sync_listeners[interface] = NULL; 00363 00364 ii.writer_bb->close(ii.writer); 00365 ii.writer = NULL; 00366 00367 } catch (Exception &e) { 00368 logger->log_error(name(), "Failed to close writer for %s:%s-%s, sync broken", 00369 ii.combo->type.c_str(), ii.combo->reader_id.c_str(), 00370 ii.combo->writer_id.c_str()); 00371 logger->log_error(name(), e); 00372 } 00373 } 00374 }