Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * fuse_client.cpp - FUSE network transport client 00004 * 00005 * Created: Thu Mar 29 00:47:24 2007 00006 * Copyright 2005-2007 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <fvutils/net/fuse_client.h> 00025 00026 #include <fvutils/net/fuse_transceiver.h> 00027 #include <fvutils/net/fuse_message_queue.h> 00028 #include <fvutils/net/fuse_message.h> 00029 #include <fvutils/net/fuse_client_handler.h> 00030 00031 #include <core/threading/mutex.h> 00032 #include <core/threading/wait_condition.h> 00033 #include <core/exceptions/software.h> 00034 #include <netcomm/socket/stream.h> 00035 #include <netcomm/utils/exceptions.h> 00036 00037 #include <cstring> 00038 #include <netinet/in.h> 00039 #include <cstdlib> 00040 #include <unistd.h> 00041 00042 using namespace fawkes; 00043 00044 namespace firevision { 00045 #if 0 /* just to make Emacs auto-indent happy */ 00046 } 00047 #endif 00048 00049 /** @class FuseClient <fvutils/net/fuse_client.h> 00050 * FUSE client. 00051 * FUSE is the FireVision protocol to retrieve information, images and lookup 00052 * tables from vision processes and to send control commands to these systems. 00053 * The client is used in the retrieving or controlling process. 00054 * @ingroup FUSE 00055 * @ingroup FireVision 00056 * @author Tim Niemueller 00057 */ 00058 00059 /** Constructor. 00060 * @param hostname host to connect to 00061 * @param port port to connect to 00062 * @param handler client handler to handle incoming data 00063 */ 00064 FuseClient::FuseClient(const char *hostname, unsigned short int port, 00065 FuseClientHandler *handler) 00066 : Thread("FuseClient") 00067 { 00068 __hostname = strdup(hostname); 00069 __port = port; 00070 __handler = handler; 00071 00072 __wait_timeout = 10; 00073 00074 __inbound_msgq = new FuseNetworkMessageQueue(); 00075 __outbound_msgq = new FuseNetworkMessageQueue(); 00076 00077 __mutex = new Mutex(); 00078 __recv_mutex = new Mutex(); 00079 __recv_waitcond = new WaitCondition(__recv_mutex); 00080 __socket = new StreamSocket(); 00081 __greeting_mutex = new Mutex(); 00082 __greeting_waitcond = new WaitCondition(__greeting_mutex); 00083 00084 __alive = true; 00085 __greeting_received = false; 00086 } 00087 00088 00089 /** Destructor. */ 00090 FuseClient::~FuseClient() 00091 { 00092 free(__hostname); 00093 00094 while ( ! __inbound_msgq->empty() ) { 00095 FuseNetworkMessage *m = __inbound_msgq->front(); 00096 m->unref(); 00097 __inbound_msgq->pop(); 00098 } 00099 delete __inbound_msgq; 00100 00101 while ( ! __outbound_msgq->empty() ) { 00102 FuseNetworkMessage *m = __outbound_msgq->front(); 00103 m->unref(); 00104 __outbound_msgq->pop(); 00105 } 00106 delete __outbound_msgq; 00107 00108 delete __mutex; 00109 delete __recv_mutex; 00110 delete __recv_waitcond; 00111 delete __socket; 00112 delete __greeting_mutex; 00113 delete __greeting_waitcond; 00114 } 00115 00116 00117 /** Connect. */ 00118 void 00119 FuseClient::connect() 00120 { 00121 __socket->connect(__hostname, __port); 00122 00123 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t)); 00124 greetmsg->version = htonl(FUSE_CURRENT_VERSION); 00125 __outbound_msgq->push(new FuseNetworkMessage(FUSE_MT_GREETING, 00126 greetmsg, sizeof(FUSE_greeting_message_t))); 00127 } 00128 00129 00130 /** Disconnect. */ 00131 void 00132 FuseClient::disconnect() 00133 { 00134 __mutex->lock(); 00135 delete __socket; 00136 __socket = new StreamSocket(); 00137 __alive = false; 00138 __mutex->unlock(); 00139 } 00140 00141 00142 /** Send queued messages. */ 00143 void 00144 FuseClient::send() 00145 { 00146 try { 00147 FuseNetworkTransceiver::send(__socket, __outbound_msgq); 00148 } catch (ConnectionDiedException &e) { 00149 e.print_trace(); 00150 __socket->close(); 00151 __alive = false; 00152 __handler->fuse_connection_died(); 00153 __recv_waitcond->wake_all(); 00154 } 00155 } 00156 00157 00158 /** Receive messages. */ 00159 void 00160 FuseClient::recv() 00161 { 00162 __recv_mutex->lock(); 00163 try { 00164 while ( __socket->available() ) { 00165 FuseNetworkTransceiver::recv(__socket, __inbound_msgq); 00166 } 00167 } catch (ConnectionDiedException &e) { 00168 e.print_trace(); 00169 __socket->close(); 00170 __alive = false; 00171 __handler->fuse_connection_died(); 00172 __recv_waitcond->wake_all(); 00173 } 00174 __recv_mutex->unlock(); 00175 } 00176 00177 00178 /** Enqueue message. 00179 * This method takes ownership of the passed message. You must explicitly 00180 * reference it before enqueing if you want to use it afterwards. 00181 * @param m message to enqueue 00182 */ 00183 void 00184 FuseClient::enqueue(FuseNetworkMessage *m) 00185 { 00186 __outbound_msgq->push_locked(m); 00187 } 00188 00189 00190 /** Enqueue message. 00191 * @param type type of message 00192 * @param payload payload of message 00193 * @param payload_size size of payload 00194 */ 00195 void 00196 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size) 00197 { 00198 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size); 00199 __outbound_msgq->push_locked(m); 00200 } 00201 00202 00203 /** Enqueue message without payload. 00204 * @param type type of message 00205 */ 00206 void 00207 FuseClient::enqueue(FUSE_message_type_t type) 00208 { 00209 FuseNetworkMessage *m = new FuseNetworkMessage(type); 00210 __outbound_msgq->push_locked(m); 00211 } 00212 00213 00214 /** Enqueue message and wait for reply. 00215 * The wait happens atomically, use this to avoid race conditions. This method 00216 * takes ownership of the passed message. You must explicitly reference it 00217 * before enqueing if you want to use it afterwards. 00218 * @param m message to enqueue 00219 */ 00220 void 00221 FuseClient::enqueue_and_wait(FuseNetworkMessage *m) 00222 { 00223 __recv_mutex->lock(); 00224 __outbound_msgq->push_locked(m); 00225 __recv_waitcond->wait(); 00226 __recv_mutex->unlock(); 00227 } 00228 00229 00230 /** Enqueue message and wait for reply. 00231 * The wait happens atomically, use this to avoid race conditions. 00232 * @param type type of message 00233 * @param payload payload of message 00234 * @param payload_size size of payload 00235 */ 00236 void 00237 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size) 00238 { 00239 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size); 00240 __recv_mutex->lock(); 00241 __outbound_msgq->push_locked(m); 00242 __recv_waitcond->wait(); 00243 __recv_mutex->unlock(); 00244 } 00245 00246 00247 /** Enqueue message without payload and wait for reply. 00248 * The wait happens atomically, use this to avoid race conditions. 00249 * @param type type of message 00250 */ 00251 void 00252 FuseClient::enqueue_and_wait(FUSE_message_type_t type) 00253 { 00254 FuseNetworkMessage *m = new FuseNetworkMessage(type); 00255 __recv_mutex->lock(); 00256 __outbound_msgq->push_locked(m); 00257 __recv_waitcond->wait(); 00258 __recv_mutex->unlock(); 00259 } 00260 00261 00262 00263 /** Sleep for some time. 00264 * Wait until inbound messages have been receive, the connection dies or the 00265 * timeout has been reached, whatever comes first. So you sleep at most timeout ms, 00266 * but short under some circumstances (incoming data or lost connection). 00267 */ 00268 void 00269 FuseClient::sleep() 00270 { 00271 try { 00272 __socket->poll(__wait_timeout /* ms timeout */, Socket::POLL_IN); 00273 } catch (Exception &e) { 00274 } 00275 } 00276 00277 00278 /** Thread loop. 00279 * Sends enqueued messages and reads incoming messages off the network. 00280 */ 00281 void 00282 FuseClient::loop() 00283 { 00284 __mutex->lock(); 00285 00286 if ( ! __alive ) { 00287 __mutex->unlock(); 00288 usleep(10000); 00289 return; 00290 } 00291 00292 bool wake = false; 00293 00294 send(); 00295 sleep(); 00296 recv(); 00297 00298 //process_inbound(); 00299 00300 __inbound_msgq->lock(); 00301 while ( ! __inbound_msgq->empty() ) { 00302 FuseNetworkMessage *m = __inbound_msgq->front(); 00303 00304 if ( m->type() == FUSE_MT_GREETING ) { 00305 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>(); 00306 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) { 00307 __handler->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version)); 00308 __alive = false; 00309 } else { 00310 __greeting_mutex->lock(); 00311 __greeting_received = true; 00312 __greeting_waitcond->wake_all(); 00313 __greeting_mutex->unlock(); 00314 __handler->fuse_connection_established(); 00315 } 00316 } else { 00317 __handler->fuse_inbound_received(m); 00318 wake = true; 00319 } 00320 00321 m->unref(); 00322 __inbound_msgq->pop(); 00323 } 00324 __inbound_msgq->unlock(); 00325 00326 if ( wake ) { 00327 __recv_waitcond->wake_all(); 00328 } 00329 __mutex->unlock(); 00330 } 00331 00332 00333 /** Wait for messages. 00334 * This will wait for messages to arrive. The calling 00335 * thread is blocked until messages are available. 00336 */ 00337 void 00338 FuseClient::wait() 00339 { 00340 __recv_mutex->lock(); 00341 __recv_waitcond->wait(); 00342 __recv_mutex->unlock(); 00343 } 00344 00345 00346 /** Wait for greeting message. 00347 * This method will wait for the greeting message to arrive. Make sure that you called 00348 * connect() before waiting or call it concurrently in another thread. The calling thread 00349 * will be blocked until the message has been received. If the message has already been 00350 * received this method will return immediately. Thus it is safe to call this at any time 00351 * without risking a race condition. 00352 */ 00353 void 00354 FuseClient::wait_greeting() 00355 { 00356 __greeting_mutex->lock(); 00357 while (! __greeting_received) { 00358 __greeting_waitcond->wait(); 00359 } 00360 __greeting_mutex->unlock(); 00361 } 00362 00363 } // end namespace firevision