00001 00002 /*************************************************************************** 00003 * server_client_thread.cpp - Thread handling Fawkes network client 00004 * 00005 * Created: Fri Nov 17 17:23:24 2006 00006 * Copyright 2006-2007 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <core/exceptions/system.h> 00025 00026 #include <netcomm/fawkes/server_client_thread.h> 00027 #include <netcomm/fawkes/server_thread.h> 00028 #include <netcomm/fawkes/message_queue.h> 00029 #include <netcomm/fawkes/transceiver.h> 00030 #include <netcomm/socket/stream.h> 00031 #include <netcomm/utils/exceptions.h> 00032 #include <core/threading/mutex.h> 00033 #include <core/threading/wait_condition.h> 00034 00035 #include <unistd.h> 00036 00037 namespace fawkes { 00038 00039 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h> 00040 * Sending thread for a Fawkes client connected to the server. 00041 * This thread is spawned for each client connected to the server to handle the 00042 * server-side sending 00043 * @ingroup NetComm 00044 * @author Tim Niemueller 00045 */ 00046 00047 class FawkesNetworkServerClientSendThread 00048 : public Thread 00049 { 00050 public: 00051 /** Constructor. 00052 * @param s client stream socket 00053 * @param parent parent FawkesNetworkServerClientThread instance 00054 */ 00055 FawkesNetworkServerClientSendThread(StreamSocket *s, 00056 FawkesNetworkServerClientThread *parent) 00057 : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP) 00058 { 00059 __s = s; 00060 __parent = parent; 00061 __outbound_mutex = new Mutex(); 00062 __outbound_msgqs[0] = new FawkesNetworkMessageQueue(); 00063 __outbound_msgqs[1] = new FawkesNetworkMessageQueue(); 00064 __outbound_active = 0; 00065 __outbound_msgq = __outbound_msgqs[0]; 00066 } 00067 00068 /** Destructor. */ 00069 ~FawkesNetworkServerClientSendThread() 00070 { 00071 for (unsigned int i = 0; i < 2; ++i) { 00072 while ( ! __outbound_msgqs[i]->empty() ) { 00073 FawkesNetworkMessage *m = __outbound_msgqs[i]->front(); 00074 m->unref(); 00075 __outbound_msgqs[i]->pop(); 00076 } 00077 } 00078 delete __outbound_msgqs[0]; 00079 delete __outbound_msgqs[1]; 00080 delete __outbound_mutex; 00081 } 00082 00083 virtual void loop() 00084 { 00085 if ( ! __parent->alive() ) return; 00086 00087 while ( __outbound_havemore ) { 00088 __outbound_mutex->lock(); 00089 __outbound_havemore = false; 00090 FawkesNetworkMessageQueue *q = __outbound_msgq; 00091 __outbound_active = 1 - __outbound_active; 00092 __outbound_msgq = __outbound_msgqs[__outbound_active]; 00093 __outbound_mutex->unlock(); 00094 00095 if ( ! q->empty() ) { 00096 try { 00097 FawkesNetworkTransceiver::send(__s, q); 00098 } catch (ConnectionDiedException &e) { 00099 __parent->connection_died(); 00100 exit(); 00101 } 00102 } 00103 } 00104 } 00105 00106 00107 /** Enqueue message to outbound queue. 00108 * This enqueues the given message to the outbound queue. The message will 00109 * be sent in the next loop iteration. This method takes ownership of the 00110 * transmitted message. If you want to use the message after enqueuing you 00111 * must reference it explicitly. 00112 * @param msg message to enqueue 00113 */ 00114 void enqueue(FawkesNetworkMessage *msg) 00115 { 00116 __outbound_mutex->lock(); 00117 __outbound_msgq->push(msg); 00118 __outbound_havemore = true; 00119 __outbound_mutex->unlock(); 00120 wakeup(); 00121 } 00122 00123 00124 /** Wait until all data has been sent. */ 00125 void wait_for_all_sent() 00126 { 00127 loop_mutex->lock(); 00128 loop_mutex->unlock(); 00129 } 00130 00131 /** Stub to see name in backtrace for easier debugging. @see Thread::run() */ 00132 protected: virtual void run() { Thread::run(); } 00133 00134 private: 00135 StreamSocket *__s; 00136 FawkesNetworkServerClientThread *__parent; 00137 00138 Mutex *__outbound_mutex; 00139 unsigned int __outbound_active; 00140 bool __outbound_havemore; 00141 FawkesNetworkMessageQueue *__outbound_msgq; 00142 FawkesNetworkMessageQueue *__outbound_msgqs[2]; 00143 00144 }; 00145 00146 00147 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h 00148 * Fawkes Network Client Thread for server. 00149 * The FawkesNetworkServerThread spawns an instance of this class for every incoming 00150 * connection. It is then used to handle the client. 00151 * The thread will start another thread, an instance of 00152 * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing 00153 * traffic. 00154 * 00155 * @ingroup NetComm 00156 * @author Tim Niemueller 00157 */ 00158 00159 /** Constructor. 00160 * @param s socket to client 00161 * @param parent parent network thread 00162 */ 00163 FawkesNetworkServerClientThread::FawkesNetworkServerClientThread(StreamSocket *s, 00164 FawkesNetworkServerThread *parent) 00165 : Thread("FawkesNetworkServerClientThread") 00166 { 00167 _s = s; 00168 _parent = parent; 00169 _alive = true; 00170 _clid = 0; 00171 _inbound_queue = new FawkesNetworkMessageQueue(); 00172 00173 _send_slave = new FawkesNetworkServerClientSendThread(_s, this); 00174 00175 set_prepfin_conc_loop(true); 00176 } 00177 00178 00179 /** Destructor. */ 00180 FawkesNetworkServerClientThread::~FawkesNetworkServerClientThread() 00181 { 00182 _send_slave->cancel(); 00183 _send_slave->join(); 00184 delete _send_slave; 00185 delete _s; 00186 delete _inbound_queue; 00187 } 00188 00189 00190 /** Get client ID. 00191 * The client ID can be used to send replies. 00192 * @return client ID 00193 */ 00194 unsigned int 00195 FawkesNetworkServerClientThread::clid() const 00196 { 00197 return _clid; 00198 } 00199 00200 00201 /** Set client ID. 00202 * @param client_id new client ID 00203 */ 00204 void 00205 FawkesNetworkServerClientThread::set_clid(unsigned int client_id) 00206 { 00207 _clid = client_id; 00208 } 00209 00210 00211 /** Receive data. 00212 * Receives data from the network if there is any and then dispatches all 00213 * inbound messages via the parent FawkesNetworkThread::dispatch() 00214 */ 00215 void 00216 FawkesNetworkServerClientThread::recv() 00217 { 00218 try { 00219 FawkesNetworkTransceiver::recv(_s, _inbound_queue); 00220 00221 _inbound_queue->lock(); 00222 while ( ! _inbound_queue->empty() ) { 00223 FawkesNetworkMessage *m = _inbound_queue->front(); 00224 m->set_client_id(_clid); 00225 _parent->dispatch(m); 00226 m->unref(); 00227 _inbound_queue->pop(); 00228 } 00229 _parent->wakeup(); 00230 _inbound_queue->unlock(); 00231 00232 } catch (ConnectionDiedException &e) { 00233 _alive = false; 00234 _s->close(); 00235 _parent->wakeup(); 00236 } 00237 } 00238 00239 00240 void 00241 FawkesNetworkServerClientThread::once() 00242 { 00243 _send_slave->start(); 00244 } 00245 00246 00247 /** Thread loop. 00248 * The client thread loop polls on the socket for 10 ms (wait for events 00249 * on the socket like closed connection or data that can be read). If any 00250 * event occurs it is processed. If the connection died or any other 00251 * error occured the thread is cancelled and the parent FawkesNetworkThread 00252 * is woken up to carry out any action that is needed when a client dies. 00253 * If data is available for reading thedata is received and dispatched 00254 * via recv(). 00255 * Afterwards the outbound message queue is processed and alle messages are 00256 * sent. This is also done if the operation could block (POLL_OUT is not 00257 * honored). 00258 */ 00259 void 00260 FawkesNetworkServerClientThread::loop() 00261 { 00262 if ( ! _alive) { 00263 usleep(1000000); 00264 return; 00265 } 00266 00267 short p = 0; 00268 try { 00269 p = _s->poll(); // block until we got a message 00270 } catch (InterruptedException &e) { 00271 // we just ignore this and try it again 00272 return; 00273 } 00274 00275 if ( (p & Socket::POLL_ERR) || 00276 (p & Socket::POLL_HUP) || 00277 (p & Socket::POLL_RDHUP)) { 00278 _alive = false; 00279 _parent->wakeup(); 00280 } else if ( p & Socket::POLL_IN ) { 00281 // Data can be read 00282 recv(); 00283 } 00284 } 00285 00286 /** Enqueue message to outbound queue. 00287 * This enqueues the given message to the outbound queue. The message will be send 00288 * in the next loop iteration. 00289 * @param msg message to enqueue 00290 */ 00291 void 00292 FawkesNetworkServerClientThread::enqueue(FawkesNetworkMessage *msg) 00293 { 00294 _send_slave->enqueue(msg); 00295 } 00296 00297 00298 /** Check aliveness of connection. 00299 * @return true if connection is still alive, false otherwise. 00300 */ 00301 bool 00302 FawkesNetworkServerClientThread::alive() const 00303 { 00304 return _alive; 00305 } 00306 00307 00308 /** Force sending of all pending outbound messages. 00309 * This is a blocking operation. The current poll will be interrupted by sending 00310 * a signal to this thread (and ignoring it) and then wait for the sending to 00311 * finish. 00312 */ 00313 void 00314 FawkesNetworkServerClientThread::force_send() 00315 { 00316 _send_slave->wait_for_all_sent(); 00317 } 00318 00319 00320 /** Connection died notification. 00321 * To be called only be the send slave thread. 00322 */ 00323 void 00324 FawkesNetworkServerClientThread::connection_died() 00325 { 00326 _alive = false; 00327 _parent->wakeup(); 00328 } 00329 00330 } // end namespace fawkes