Fawkes API Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * fuse_transceiver.cpp - Fuse transceiver 00004 * 00005 * Created: Wed Nov 14 13:30:34 2007 00006 * Copyright 2006-2007 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <fvutils/net/fuse_transceiver.h> 00025 #include <fvutils/net/fuse_message_queue.h> 00026 #include <fvutils/net/fuse_message.h> 00027 #include <netcomm/socket/stream.h> 00028 #include <netcomm/utils/exceptions.h> 00029 00030 #include <netinet/in.h> 00031 #include <cstdlib> 00032 00033 using namespace fawkes; 00034 00035 namespace firevision { 00036 #if 0 /* just to make Emacs auto-indent happy */ 00037 } 00038 #endif 00039 00040 /** @class FuseNetworkTransceiver <fvutils/net/fuse_transceiver.h> 00041 * FUSE Network Transceiver. 00042 * Utility class that provides methods to send and receive messages via 00043 * the network. Operates on message queues and a given socket. 00044 * 00045 * @ingroup FUSE 00046 * @ingroup FireVision 00047 * @author Tim Niemueller 00048 */ 00049 00050 /** Send messages. 00051 * @param s socket over which the data shall be transmitted. 00052 * @param msgq message queue that contains the messages that have to be sent 00053 * @exception ConnectionDiedException Thrown if any error occurs during the 00054 * operation since for any error the conncetion is considered dead. 00055 */ 00056 void 00057 FuseNetworkTransceiver::send(StreamSocket *s, FuseNetworkMessageQueue *msgq) 00058 { 00059 msgq->lock(); 00060 try { 00061 while ( ! msgq->empty() ) { 00062 FuseNetworkMessage *m = msgq->front(); 00063 m->pack(); 00064 const FUSE_message_t &f = m->fmsg(); 00065 unsigned int payload_size = m->payload_size(); 00066 s->write(&(f.header), sizeof(f.header)); 00067 s->write(f.payload, payload_size); 00068 m->unref(); 00069 msgq->pop(); 00070 } 00071 } catch (SocketException &e) { 00072 msgq->unlock(); 00073 throw ConnectionDiedException("Write failed"); 00074 } 00075 msgq->unlock(); 00076 } 00077 00078 00079 /** Receive data. 00080 * This method receives all messages currently available from the network, or 00081 * a limited number depending on max_num_msgs. If max_num_msgs is 0 then all 00082 * messages are read. Note that on a busy connection this may cause recv() to 00083 * never return! The default is to return after 8 messages. 00084 * The messages are stored in the supplied message queue. 00085 * @param s socket to gather messages from 00086 * @param msgq message queue to store received messages in 00087 * @param max_num_msgs maximum number of messages to read from stream in one go. 00088 * @exception ConnectionDiedException Thrown if any error occurs during the 00089 * operation since for any error the conncetion is considered dead. 00090 */ 00091 void 00092 FuseNetworkTransceiver::recv(StreamSocket *s, FuseNetworkMessageQueue *msgq, 00093 unsigned int max_num_msgs) 00094 { 00095 msgq->lock(); 00096 00097 try { 00098 unsigned int num_msgs = 0; 00099 do { 00100 FUSE_message_t msg; 00101 s->read(&(msg.header), sizeof(msg.header)); 00102 00103 unsigned int payload_size = ntohl(msg.header.payload_size); 00104 00105 if ( payload_size > 0 ) { 00106 00107 msg.payload = malloc(payload_size); 00108 s->read(msg.payload, payload_size); 00109 } else { 00110 msg.payload = NULL; 00111 } 00112 00113 FuseNetworkMessage *m = new FuseNetworkMessage(&msg); 00114 msgq->push(m); 00115 00116 ++num_msgs; 00117 } while ( s->available() && (num_msgs < max_num_msgs) ); 00118 } catch (SocketException &e) { 00119 msgq->unlock(); 00120 throw ConnectionDiedException("Read failed"); 00121 } 00122 msgq->unlock(); 00123 } 00124 00125 } // end namespace firevision