Fawkes API Fawkes Development Version

fuse_server_client_thread.cpp

00001 
00002 /***************************************************************************
00003  *  fuse_server_client_thread.cpp - client thread for FuseServer
00004  *
00005  *  Created: Tue Nov 13 20:00:55 2007
00006  *  Copyright  2005-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <fvutils/net/fuse_server_client_thread.h>
00025 
00026 #include <fvutils/net/fuse_server.h>
00027 #include <fvutils/net/fuse_server.h>
00028 #include <fvutils/net/fuse_transceiver.h>
00029 #include <fvutils/net/fuse_message_queue.h>
00030 #include <fvutils/net/fuse_image_content.h>
00031 #include <fvutils/net/fuse_lut_content.h>
00032 #include <fvutils/net/fuse_imagelist_content.h>
00033 #include <fvutils/net/fuse_lutlist_content.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <fvutils/compression/jpeg_compressor.h>
00037 
00038 #include <core/exceptions/system.h>
00039 #include <netcomm/socket/stream.h>
00040 #include <netcomm/utils/exceptions.h>
00041 #include <utils/logging/liblogger.h>
00042 
00043 #include <netinet/in.h>
00044 #include <cstring>
00045 #include <cstdlib>
00046 
00047 using namespace fawkes;
00048 
00049 namespace firevision {
00050 #if 0 /* just to make Emacs auto-indent happy */
00051 }
00052 #endif
00053 
00054 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h>
00055  * FUSE Server Client Thread.
00056  * This thread is instantiated and started for each client that connects to a
00057  * FuseServer.
00058  * @ingroup FUSE
00059  * @ingroup FireVision
00060  * @author Tim Niemueller
00061  */
00062 
00063 /** Constructor.
00064  * @param fuse_server parent FUSE server
00065  * @param s socket to client
00066  */
00067 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
00068   : Thread("FuseServerClientThread")
00069 {
00070   __fuse_server = fuse_server;
00071   __socket = s;
00072   __jpeg_compressor = NULL;
00073 
00074   __inbound_queue  = new FuseNetworkMessageQueue();
00075   __outbound_queue  = new FuseNetworkMessageQueue();
00076 
00077   FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00078   greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00079   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00080                                                 greetmsg, sizeof(FUSE_greeting_message_t)));
00081 
00082   __alive = true;
00083 }
00084 
00085 
00086 /** Destructor. */
00087 FuseServerClientThread::~FuseServerClientThread()
00088 {
00089   delete __socket;
00090   delete __jpeg_compressor;
00091 
00092   for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) {
00093     delete __bit->second;
00094   }
00095   __buffers.clear();
00096 
00097   for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) {
00098     delete __lit->second;
00099   }
00100   __luts.clear();
00101 
00102   while ( ! __inbound_queue->empty() ) {
00103     FuseNetworkMessage *m = __inbound_queue->front();
00104     m->unref();
00105     __inbound_queue->pop();
00106   }
00107 
00108   while ( ! __outbound_queue->empty() ) {
00109     FuseNetworkMessage *m = __outbound_queue->front();
00110     m->unref();
00111     __outbound_queue->pop();
00112   }
00113 
00114   delete __inbound_queue;
00115   delete __outbound_queue;
00116 }
00117 
00118 
00119 /** Send all messages in outbound queue. */
00120 void
00121 FuseServerClientThread::send()
00122 {
00123   if ( ! __outbound_queue->empty() ) {
00124     try {
00125       FuseNetworkTransceiver::send(__socket, __outbound_queue);
00126     } catch (Exception &e) {
00127       __fuse_server->connection_died(this);
00128       __alive = false;
00129     }
00130   }
00131 }
00132 
00133 
00134 /** Receive data.
00135  * Receives data from the network if there is any and then processes all
00136  * inbound messages.
00137  */
00138 void
00139 FuseServerClientThread::recv()
00140 {
00141   try {
00142     FuseNetworkTransceiver::recv(__socket, __inbound_queue);
00143   } catch (ConnectionDiedException &e) {
00144     __socket->close();
00145     __fuse_server->connection_died(this);
00146     __alive = false;
00147   }
00148 }
00149 
00150 
00151 /** Process greeting message.
00152  * @param m received message
00153  */
00154 void
00155 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m)
00156 {
00157   FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00158   if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00159     throw Exception("Invalid version on other side");
00160   }
00161 }
00162 
00163 
00164 SharedMemoryImageBuffer *
00165 FuseServerClientThread::get_shmimgbuf(const char *id)
00166 {
00167   char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
00168   tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
00169   strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
00170 
00171   if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) {
00172     // the buffer has not yet been opened
00173     try {
00174       SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
00175       __buffers[tmp_image_id] = b;
00176       return b;
00177     } catch (Exception &e) {
00178       throw;
00179     }
00180   } else {
00181     return __bit->second;
00182   }
00183 }
00184 
00185 
00186 /** Process image request message.
00187  * @param m received message
00188  */
00189 void
00190 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m)
00191 {
00192   FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>();
00193 
00194   SharedMemoryImageBuffer *b;
00195   try {
00196     b = get_shmimgbuf(irm->image_id);
00197   } catch (Exception &e) {
00198     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00199                                                     m->payload(), m->payload_size(),
00200                                                     /* copy payload */ true);
00201     __outbound_queue->push(nm);
00202     return;
00203   }
00204 
00205   if ( irm->format == FUSE_IF_RAW ) {
00206     FuseImageContent *im = new FuseImageContent(b);
00207     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00208   } else if ( irm->format == FUSE_IF_JPEG ) {
00209     if ( ! __jpeg_compressor) {
00210       __jpeg_compressor = new JpegImageCompressor();
00211       __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM);
00212     }
00213     b->lock_for_read();
00214     __jpeg_compressor->set_image_dimensions(b->width(), b->height());
00215     __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer());
00216     unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size());
00217     __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size());
00218     __jpeg_compressor->compress();
00219     b->unlock();
00220     size_t compressed_buffer_size = __jpeg_compressor->compressed_size();
00221     long int sec = 0, usec = 0;
00222     b->capture_time(&sec, &usec);
00223     FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(),
00224                                                 compressed_buffer, compressed_buffer_size,
00225                                                 CS_UNKNOWN, b->width(), b->height(),
00226                                                 sec, usec);
00227     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00228     free(compressed_buffer);
00229   } else {
00230     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00231                                                     m->payload(), m->payload_size(),
00232                                                     /* copy payload */ true);
00233     __outbound_queue->push(nm);
00234   }
00235 }
00236 
00237 /** Process image info request message.
00238  * @param m received message
00239  */
00240 void
00241 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m)
00242 {
00243   FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>();
00244 
00245   SharedMemoryImageBuffer *b;
00246   try {
00247     b = get_shmimgbuf(idm->image_id);
00248 
00249     FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
00250     
00251     strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH);
00252     ii->colorspace = htons(b->colorspace());
00253     ii->width = htonl(b->width());
00254     ii->height = htonl(b->height());
00255     ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
00256 
00257     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO,
00258                                                     ii, sizeof(FUSE_imageinfo_t));
00259     __outbound_queue->push(nm);
00260   } catch (Exception &e) {
00261     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00262                                                     m->payload(), m->payload_size(),
00263                                                     /* copy payload */ true);
00264     __outbound_queue->push(nm);
00265   }
00266 }
00267 
00268 
00269 /** Process LUT request message.
00270  * @param m received message
00271  */
00272 void
00273 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m)
00274 {
00275   FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>();
00276 
00277   char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
00278   tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
00279   strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
00280 
00281   if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) {
00282     // the buffer had already be opened
00283     FuseLutContent *lm = new FuseLutContent(__lit->second);
00284     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00285   } else {
00286     try {
00287       SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id);
00288       __luts[tmp_lut_id] = b;
00289       FuseLutContent *lm = new FuseLutContent(b);
00290       __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00291     } catch (Exception &e) {
00292       // could not open the shared memory segment for some reason, send failure
00293       FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
00294                                                       m->payload(), m->payload_size(),
00295                                                       /* copy payload */ true);
00296       __outbound_queue->push(nm);
00297     }
00298   }
00299 }
00300 
00301 
00302 /** Process LUT setting.
00303  * @param m received message
00304  */
00305 void
00306 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m)
00307 {
00308   FuseLutContent *lc = m->msgc<FuseLutContent>();  
00309   FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t));
00310   strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH);
00311   // Currently we expect colormaps, so make sure we get sensible dimensions
00312 
00313   SharedMemoryLookupTable *b;
00314   if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) {
00315     // the buffer had already been opened
00316     b = __lit->second;
00317   } else {
00318     try {
00319       b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false);
00320       __luts[lc->lut_id()] = b;
00321     } catch (Exception &e) {
00322       __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00323                                                     reply, sizeof(FUSE_lutdesc_message_t)));
00324       e.append("Cannot open shared memory lookup table %s", lc->lut_id());
00325       LibLogger::log_warn("FuseServerClientThread", e);
00326       delete lc;
00327       return;
00328     }
00329   }
00330 
00331   if ( (b->width() != lc->width())   ||
00332        (b->height() != lc->height()) ||
00333        (b->depth() != lc->depth())   ||
00334        (b->bytes_per_cell() != lc->bytes_per_cell()) ) {
00335     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00336                                                   reply, sizeof(FUSE_lutdesc_message_t)));
00337     LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. "
00338                         "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
00339                         b->width(), b->height(), b->depth(), b->bytes_per_cell(),
00340                         lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell());
00341   } else {
00342     b->set(lc->buffer());
00343     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED,
00344                                                   reply, sizeof(FUSE_lutdesc_message_t)));
00345   }
00346 
00347   delete lc;
00348 }
00349 
00350 
00351 /** Process image list request message.
00352  * @param m received message
00353  */
00354 void
00355 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m)
00356 {
00357   FuseImageListContent *ilm = new FuseImageListContent();
00358 
00359   SharedMemoryImageBufferHeader *h      = new SharedMemoryImageBufferHeader();
00360   SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
00361   SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00362                                                             
00363   while ( i != endi ) {
00364     const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
00365     if ( ih ) {
00366       ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
00367     }
00368 
00369     ++i;
00370   }
00371 
00372   delete h;
00373 
00374   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
00375 }
00376 
00377 
00378 /** Process LUT list request message.
00379  * @param m received message
00380  */
00381 void
00382 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m)
00383 {
00384   FuseLutListContent *llm = new FuseLutListContent();
00385 
00386   SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader();
00387   SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
00388   SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00389                                                             
00390   while ( i != endi ) {
00391     const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
00392     if ( lh ) {
00393       llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
00394     }
00395 
00396     ++i;
00397   }
00398 
00399   delete h;
00400 
00401   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
00402 }
00403 
00404 
00405 /** Process inbound messages. */
00406 void
00407 FuseServerClientThread::process_inbound()
00408 {
00409   __inbound_queue->lock();
00410   while ( ! __inbound_queue->empty() ) {
00411     FuseNetworkMessage *m = __inbound_queue->front();
00412 
00413     try {
00414       switch (m->type()) {
00415       case FUSE_MT_GREETING:
00416         process_greeting_message(m);
00417         break;
00418       case FUSE_MT_GET_IMAGE:
00419         process_getimage_message(m);
00420         break;
00421       case FUSE_MT_GET_IMAGE_INFO:
00422         process_getimageinfo_message(m);
00423         break;
00424       case FUSE_MT_GET_IMAGE_LIST:
00425         process_getimagelist_message(m);
00426         break;
00427       case FUSE_MT_GET_LUT_LIST:
00428         process_getlutlist_message(m);
00429         break;
00430       case FUSE_MT_GET_LUT:
00431         process_getlut_message(m);
00432         break;
00433       case FUSE_MT_SET_LUT:
00434         process_setlut_message(m);
00435         break;
00436       default:
00437         throw Exception("Unknown message type received\n");
00438       }
00439     } catch (Exception &e) {
00440       e.append("FUSE protocol error");
00441       LibLogger::log_warn("FuseServerClientThread", e);
00442       __fuse_server->connection_died(this);
00443       __alive = false;
00444     }
00445 
00446     m->unref();
00447     __inbound_queue->pop();
00448   }
00449   __inbound_queue->unlock();
00450 }
00451 
00452 
00453 void
00454 FuseServerClientThread::loop()
00455 {
00456   if ( ! __alive ) {
00457     usleep(10000);
00458     return;
00459   }
00460 
00461   short p = 0;
00462   try {
00463     p = __socket->poll(10); // block for up to 10 ms
00464   } catch (InterruptedException &e) {
00465     // we just ignore this and try it again
00466     return;
00467   }
00468 
00469   if ( (p & Socket::POLL_ERR) ||
00470        (p & Socket::POLL_HUP) ||
00471        (p & Socket::POLL_RDHUP)) {
00472     __fuse_server->connection_died(this);
00473     __alive = false;
00474   } else if ( p & Socket::POLL_IN ) {
00475     try {
00476       // Data can be read
00477       recv();
00478       process_inbound();
00479     }
00480     catch (...) {
00481       __fuse_server->connection_died(this);
00482       __alive = false;
00483     }
00484   }
00485 
00486   if ( __alive ) {
00487     send();
00488   }
00489 }
00490 
00491 } // end namespace firevision
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends