Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * server_thread.cpp - Fawkes Network Protocol (server part) 00004 * 00005 * Created: Sun Nov 19 15:08:30 2006 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <netcomm/fawkes/server_thread.h> 00025 #include <netcomm/fawkes/server_client_thread.h> 00026 #include <netcomm/utils/acceptor_thread.h> 00027 #include <netcomm/fawkes/message.h> 00028 #include <netcomm/fawkes/handler.h> 00029 #include <netcomm/fawkes/message_queue.h> 00030 #include <netcomm/fawkes/message_content.h> 00031 #include <core/threading/thread_collector.h> 00032 #include <core/threading/mutex.h> 00033 #include <core/exception.h> 00034 00035 #include <unistd.h> 00036 00037 namespace fawkes { 00038 00039 /** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h> 00040 * Fawkes Network Thread. 00041 * Maintains a list of clients and reacts on events triggered by the clients. 00042 * Also runs the acceptor thread. 00043 * 00044 * @ingroup NetComm 00045 * @author Tim Niemueller 00046 */ 00047 00048 /** Constructor. 00049 * @param thread_collector thread collector to register new threads with 00050 * @param fawkes_port port for Fawkes network protocol 00051 */ 00052 FawkesNetworkServerThread::FawkesNetworkServerThread(unsigned int fawkes_port, 00053 ThreadCollector *thread_collector) 00054 : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP) 00055 { 00056 this->thread_collector = thread_collector; 00057 clients.clear(); 00058 next_client_id = 1; 00059 inbound_messages = new FawkesNetworkMessageQueue(); 00060 00061 acceptor_thread = new NetworkAcceptorThread(this, fawkes_port, 00062 "FawkesNetworkAcceptorThread"); 00063 if ( thread_collector ) { 00064 thread_collector->add(acceptor_thread); 00065 } else { 00066 acceptor_thread->start(); 00067 } 00068 } 00069 00070 00071 /** Destructor. */ 00072 FawkesNetworkServerThread::~FawkesNetworkServerThread() 00073 { 00074 for (cit = clients.begin(); cit != clients.end(); ++cit) { 00075 if ( thread_collector ) { 00076 thread_collector->remove((*cit).second); 00077 } else { 00078 (*cit).second->cancel(); 00079 (*cit).second->join(); 00080 } 00081 delete (*cit).second; 00082 } 00083 if ( thread_collector ) { 00084 thread_collector->remove(acceptor_thread); 00085 } else { 00086 acceptor_thread->cancel(); 00087 acceptor_thread->join(); 00088 } 00089 delete acceptor_thread; 00090 00091 delete inbound_messages; 00092 } 00093 00094 00095 /** Add a new connection. 00096 * Called by the NetworkAcceptorThread if a new client connected. 00097 * @param s socket for new client 00098 */ 00099 void 00100 FawkesNetworkServerThread::add_connection(StreamSocket *s) throw() 00101 { 00102 FawkesNetworkServerClientThread *client = new FawkesNetworkServerClientThread(s, this); 00103 00104 clients.lock(); 00105 client->set_clid(next_client_id); 00106 if ( thread_collector ) { 00107 thread_collector->add(client); 00108 } else { 00109 client->start(); 00110 } 00111 clients[next_client_id] = client; 00112 for (hit = handlers.begin(); hit != handlers.end(); ++hit) { 00113 (*hit).second->client_connected(next_client_id); 00114 } 00115 ++next_client_id; 00116 clients.unlock(); 00117 00118 wakeup(); 00119 } 00120 00121 00122 /** Add a handler. 00123 * @param handler to add. 00124 */ 00125 void 00126 FawkesNetworkServerThread::add_handler(FawkesNetworkHandler *handler) 00127 { 00128 handlers.lock(); 00129 if ( handlers.find(handler->id()) != handlers.end()) { 00130 handlers.unlock(); 00131 throw Exception("Handler already registered"); 00132 } 00133 handlers[handler->id()] = handler; 00134 handlers.unlock(); 00135 } 00136 00137 00138 /** Remove handler. 00139 * @param handler handler to remove 00140 */ 00141 void 00142 FawkesNetworkServerThread::remove_handler(FawkesNetworkHandler *handler) 00143 { 00144 handlers.lock(); 00145 if( handlers.find(handler->id()) != handlers.end() ) { 00146 handlers.erase(handler->id()); 00147 } 00148 handlers.unlock(); 00149 } 00150 00151 00152 /** Fawkes network thread loop. 00153 * The thread loop will check all clients for their alivness and dead 00154 * clients are removed. Then inbound messages are processed and dispatched 00155 * properly to registered handlers. Then the thread waits for a new event 00156 * to happen (event emitting threads need to wakeup this thread!). 00157 */ 00158 void 00159 FawkesNetworkServerThread::loop() 00160 { 00161 clients.lock(); 00162 00163 // check for dead clients 00164 cit = clients.begin(); 00165 while (cit != clients.end()) { 00166 if ( ! cit->second->alive() ) { 00167 if ( thread_collector ) { 00168 thread_collector->remove((*cit).second); 00169 } else { 00170 cit->second->cancel(); 00171 cit->second->join(); 00172 } 00173 usleep(5000); 00174 delete cit->second; 00175 unsigned int clid = (*cit).first; 00176 ++cit; 00177 clients.erase(clid); 00178 for (hit = handlers.begin(); hit != handlers.end(); ++hit) { 00179 (*hit).second->client_disconnected(clid); 00180 } 00181 } else { 00182 ++cit; 00183 } 00184 } 00185 00186 // dispatch messages 00187 inbound_messages->lock(); 00188 while ( ! inbound_messages->empty() ) { 00189 FawkesNetworkMessage *m = inbound_messages->front(); 00190 if ( handlers.find(m->cid()) != handlers.end()) { 00191 handlers[m->cid()]->handle_network_message(m); 00192 } 00193 m->unref(); 00194 inbound_messages->pop(); 00195 } 00196 inbound_messages->unlock(); 00197 00198 clients.unlock(); 00199 } 00200 00201 00202 /** Force sending of all pending messages. */ 00203 void 00204 FawkesNetworkServerThread::force_send() 00205 { 00206 clients.lock(); 00207 for (cit = clients.begin(); cit != clients.end(); ++cit) { 00208 (*cit).second->force_send(); 00209 } 00210 clients.unlock(); 00211 } 00212 00213 00214 /** Broadcast a message. 00215 * Method to broadcast a message to all connected clients. This method will take 00216 * ownership of the passed message. If you want to use if after enqueing it you 00217 * must reference it explicitly before calling this method. 00218 * @param msg Message to broadcast 00219 */ 00220 void 00221 FawkesNetworkServerThread::broadcast(FawkesNetworkMessage *msg) 00222 { 00223 for (cit = clients.begin(); cit != clients.end(); ++cit) { 00224 if ( (*cit).second->alive() ) { 00225 msg->ref(); 00226 (*cit).second->enqueue(msg); 00227 } 00228 } 00229 msg->unref(); 00230 } 00231 00232 00233 /** Broadcast a message. 00234 * A FawkesNetworkMessage is created and broacasted via the emitter. 00235 * @param component_id component ID 00236 * @param msg_id message type id 00237 * @param payload payload buffer 00238 * @param payload_size size of payload buffer 00239 * @see FawkesNetworkEmitter::broadcast() 00240 */ 00241 void 00242 FawkesNetworkServerThread::broadcast(unsigned short int component_id, 00243 unsigned short int msg_id, 00244 void *payload, unsigned int payload_size) 00245 { 00246 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id, 00247 payload, payload_size); 00248 broadcast(m); 00249 } 00250 00251 00252 /** Broadcast message without payload. 00253 * @param component_id component ID 00254 * @param msg_id message type ID 00255 */ 00256 void 00257 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id) 00258 { 00259 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id); 00260 broadcast(m); 00261 } 00262 00263 00264 /** Send a message. 00265 * Method to send a message to a specific client. 00266 * The client ID provided in the message is used to determine the correct 00267 * recipient. If no client is connected for the given client ID the message 00268 * shall be silently ignored. 00269 * This method will take ownership of the passed message. If you want to use 00270 * if after enqueing it you must reference it explicitly before calling this 00271 * method. 00272 * Implemented Emitter interface message. 00273 * @param msg Message to send 00274 */ 00275 void 00276 FawkesNetworkServerThread::send(FawkesNetworkMessage *msg) 00277 { 00278 unsigned int clid = msg->clid(); 00279 if ( clients.find(clid) != clients.end() ) { 00280 if ( clients[clid]->alive() ) { 00281 clients[clid]->enqueue(msg); 00282 } else { 00283 throw Exception("Client %u not alive", clid); 00284 } 00285 } else { 00286 throw Exception("Client %u not found", clid); 00287 } 00288 } 00289 00290 00291 /** Send a message. 00292 * A FawkesNetworkMessage is created and sent via the emitter. 00293 * @param to_clid client ID of recipient 00294 * @param component_id component ID 00295 * @param msg_id message type id 00296 * @param payload payload buffer 00297 * @param payload_size size of payload buffer 00298 * @see FawkesNetworkEmitter::broadcast() 00299 */ 00300 void 00301 FawkesNetworkServerThread::send(unsigned int to_clid, 00302 unsigned short int component_id, unsigned short int msg_id, 00303 void *payload, unsigned int payload_size) 00304 { 00305 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id, 00306 payload, payload_size); 00307 send(m); 00308 } 00309 00310 00311 /** Send a message. 00312 * A FawkesNetworkMessage is created and sent via the emitter. 00313 * @param to_clid client ID of recipient 00314 * @param component_id component ID 00315 * @param msg_id message type id 00316 * @param content Fawkes complex network message content 00317 * @see FawkesNetworkEmitter::broadcast() 00318 */ 00319 void 00320 FawkesNetworkServerThread::send(unsigned int to_clid, 00321 unsigned short int component_id, unsigned short int msg_id, 00322 FawkesNetworkMessageContent *content) 00323 { 00324 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id, 00325 content); 00326 send(m); 00327 } 00328 00329 00330 /** Send a message without payload. 00331 * A FawkesNetworkMessage with empty payload is created and sent via the emitter. 00332 * This is particularly useful for simple status messages that you want to send. 00333 * @param to_clid client ID of recipient 00334 * @param component_id component ID 00335 * @param msg_id message type id 00336 * @see FawkesNetworkEmitter::broadcast() 00337 */ 00338 void 00339 FawkesNetworkServerThread::send(unsigned int to_clid, 00340 unsigned short int component_id, unsigned short int msg_id) 00341 { 00342 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id); 00343 send(m); 00344 } 00345 00346 00347 /** Dispatch messages. 00348 * Actually messages are just put into the inbound message queue and dispatched 00349 * during the next loop iteration. So after adding all the messages you have 00350 * to wakeup the thread to get them actually dispatched. 00351 * @param msg message to dispatch 00352 */ 00353 void 00354 FawkesNetworkServerThread::dispatch(FawkesNetworkMessage *msg) 00355 { 00356 msg->ref(); 00357 inbound_messages->push_locked(msg); 00358 } 00359 00360 } // end namespace fawkes