Fawkes API  Fawkes Development Version
fuse_server_client_thread.cpp
1 
2 /***************************************************************************
3  * fuse_server_client_thread.cpp - client thread for FuseServer
4  *
5  * Created: Tue Nov 13 20:00:55 2007
6  * Copyright 2005-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/system.h>
25 #include <fvutils/compression/jpeg_compressor.h>
26 #include <fvutils/ipc/shm_image.h>
27 #include <fvutils/ipc/shm_lut.h>
28 #include <fvutils/net/fuse_image_content.h>
29 #include <fvutils/net/fuse_imagelist_content.h>
30 #include <fvutils/net/fuse_lut_content.h>
31 #include <fvutils/net/fuse_lutlist_content.h>
32 #include <fvutils/net/fuse_message_queue.h>
33 #include <fvutils/net/fuse_server.h>
34 #include <fvutils/net/fuse_server_client_thread.h>
35 #include <fvutils/net/fuse_transceiver.h>
36 #include <logging/liblogger.h>
37 #include <netcomm/socket/stream.h>
38 #include <netcomm/utils/exceptions.h>
39 #include <netinet/in.h>
40 
41 #include <cstdlib>
42 #include <cstring>
43 #include <unistd.h>
44 
45 using namespace fawkes;
46 
47 namespace firevision {
48 
49 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h>
50  * FUSE Server Client Thread.
51  * This thread is instantiated and started for each client that connects to a
52  * FuseServer.
53  * @ingroup FUSE
54  * @ingroup FireVision
55  * @author Tim Niemueller
56  */
57 
58 /** Constructor.
59  * @param fuse_server parent FUSE server
60  * @param s socket to client
61  */
62 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
63 : Thread("FuseServerClientThread")
64 {
65  fuse_server_ = fuse_server;
66  socket_ = s;
67  jpeg_compressor_ = NULL;
68 
69  inbound_queue_ = new FuseNetworkMessageQueue();
70  outbound_queue_ = new FuseNetworkMessageQueue();
71 
72  FUSE_greeting_message_t *greetmsg =
74  greetmsg->version = htonl(FUSE_CURRENT_VERSION);
75  outbound_queue_->push(
76  new FuseNetworkMessage(FUSE_MT_GREETING, greetmsg, sizeof(FUSE_greeting_message_t)));
77 
78  alive_ = true;
79 }
80 
81 /** Destructor. */
83 {
84  delete socket_;
85  delete jpeg_compressor_;
86 
87  for (bit_ = buffers_.begin(); bit_ != buffers_.end(); ++bit_) {
88  delete bit_->second;
89  }
90  buffers_.clear();
91 
92  for (lit_ = luts_.begin(); lit_ != luts_.end(); ++lit_) {
93  delete lit_->second;
94  }
95  luts_.clear();
96 
97  while (!inbound_queue_->empty()) {
98  FuseNetworkMessage *m = inbound_queue_->front();
99  m->unref();
100  inbound_queue_->pop();
101  }
102 
103  while (!outbound_queue_->empty()) {
104  FuseNetworkMessage *m = outbound_queue_->front();
105  m->unref();
106  outbound_queue_->pop();
107  }
108 
109  delete inbound_queue_;
110  delete outbound_queue_;
111 }
112 
113 /** Send all messages in outbound queue. */
114 void
116 {
117  if (!outbound_queue_->empty()) {
118  try {
119  FuseNetworkTransceiver::send(socket_, outbound_queue_);
120  } catch (Exception &e) {
121  fuse_server_->connection_died(this);
122  alive_ = false;
123  }
124  }
125 }
126 
127 /** Receive data.
128  * Receives data from the network if there is any and then processes all
129  * inbound messages.
130  */
131 void
133 {
134  try {
135  FuseNetworkTransceiver::recv(socket_, inbound_queue_);
136  } catch (ConnectionDiedException &e) {
137  socket_->close();
138  fuse_server_->connection_died(this);
139  alive_ = false;
140  }
141 }
142 
143 /** Process greeting message.
144  * @param m received message
145  */
146 void
148 {
150  if (ntohl(gm->version) != FUSE_CURRENT_VERSION) {
151  throw Exception("Invalid version on other side");
152  }
153 }
154 
156 FuseServerClientThread::get_shmimgbuf(const char *id)
157 {
158  char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
159  tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
160  strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
161 
162  if ((bit_ = buffers_.find(tmp_image_id)) == buffers_.end()) {
163  // the buffer has not yet been opened
164  try {
165  SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
166  buffers_[tmp_image_id] = b;
167  return b;
168  } catch (Exception &e) {
169  throw;
170  }
171  } else {
172  return bit_->second;
173  }
174 }
175 
176 /** Process image request message.
177  * @param m received message
178  */
179 void
181 {
183 
185  try {
186  b = get_shmimgbuf(irm->image_id);
187  } catch (Exception &e) {
188  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
189  m->payload(),
190  m->payload_size(),
191  /* copy payload */ true);
192  outbound_queue_->push(nm);
193  return;
194  }
195 
196  if (irm->format == FUSE_IF_RAW) {
197  FuseImageContent *im = new FuseImageContent(b);
198  outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
199  } else if (irm->format == FUSE_IF_JPEG) {
200  if (!jpeg_compressor_) {
201  jpeg_compressor_ = new JpegImageCompressor();
203  }
204  b->lock_for_read();
205  jpeg_compressor_->set_image_dimensions(b->width(), b->height());
206  jpeg_compressor_->set_image_buffer(b->colorspace(), b->buffer());
207  unsigned char *compressed_buffer =
208  (unsigned char *)malloc(jpeg_compressor_->recommended_compressed_buffer_size());
209  jpeg_compressor_->set_destination_buffer(
210  compressed_buffer, jpeg_compressor_->recommended_compressed_buffer_size());
211  jpeg_compressor_->compress();
212  b->unlock();
213  size_t compressed_buffer_size = jpeg_compressor_->compressed_size();
214  long int sec = 0, usec = 0;
215  b->capture_time(&sec, &usec);
216  FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG,
217  b->image_id(),
218  compressed_buffer,
219  compressed_buffer_size,
220  CS_UNKNOWN,
221  b->width(),
222  b->height(),
223  sec,
224  usec);
225  outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
226  free(compressed_buffer);
227  } else {
228  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
229  m->payload(),
230  m->payload_size(),
231  /* copy payload */ true);
232  outbound_queue_->push(nm);
233  }
234 }
235 
236 /** Process image info request message.
237  * @param m received message
238  */
239 void
241 {
243 
245  try {
246  b = get_shmimgbuf(idm->image_id);
247 
248  FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
249 
250  strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH - 1);
251  ii->colorspace = htons(b->colorspace());
252  ii->width = htonl(b->width());
253  ii->height = htonl(b->height());
254  ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
255 
256  FuseNetworkMessage *nm =
257  new FuseNetworkMessage(FUSE_MT_IMAGE_INFO, ii, sizeof(FUSE_imageinfo_t));
258  outbound_queue_->push(nm);
259  } catch (Exception &e) {
260  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
261  m->payload(),
262  m->payload_size(),
263  /* copy payload */ true);
264  outbound_queue_->push(nm);
265  }
266 }
267 
268 /** Process LUT request message.
269  * @param m received message
270  */
271 void
273 {
275 
276  char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
277  tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
278  strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
279 
280  if ((lit_ = luts_.find(tmp_lut_id)) != luts_.end()) {
281  // the buffer had already be opened
282  FuseLutContent *lm = new FuseLutContent(lit_->second);
283  outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
284  } else {
285  try {
287  luts_[tmp_lut_id] = b;
288  FuseLutContent *lm = new FuseLutContent(b);
289  outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
290  } catch (Exception &e) {
291  // could not open the shared memory segment for some reason, send failure
292  FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
293  m->payload(),
294  m->payload_size(),
295  /* copy payload */ true);
296  outbound_queue_->push(nm);
297  }
298  }
299 }
300 
301 /** Process LUT setting.
302  * @param m received message
303  */
304 void
306 {
307  FuseLutContent * lc = m->msgc<FuseLutContent>();
309  strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH - 1);
310  // Currently we expect colormaps, so make sure we get sensible dimensions
311 
313  if ((lit_ = luts_.find(lc->lut_id())) != luts_.end()) {
314  // the buffer had already been opened
315  b = lit_->second;
316  } else {
317  try {
318  b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false);
319  luts_[lc->lut_id()] = b;
320  } catch (Exception &e) {
321  outbound_queue_->push(
322  new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, reply, sizeof(FUSE_lutdesc_message_t)));
323  e.append("Cannot open shared memory lookup table %s", lc->lut_id());
324  LibLogger::log_warn("FuseServerClientThread", e);
325  delete lc;
326  return;
327  }
328  }
329 
330  if ((b->width() != lc->width()) || (b->height() != lc->height()) || (b->depth() != lc->depth())
331  || (b->bytes_per_cell() != lc->bytes_per_cell())) {
332  outbound_queue_->push(
333  new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, reply, sizeof(FUSE_lutdesc_message_t)));
334  LibLogger::log_warn("FuseServerClientThread",
335  "LUT upload: dimensions do not match. "
336  "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
337  b->width(),
338  b->height(),
339  b->depth(),
340  b->bytes_per_cell(),
341  lc->width(),
342  lc->height(),
343  lc->depth(),
344  lc->bytes_per_cell());
345  } else {
346  b->set(lc->buffer());
347  outbound_queue_->push(
348  new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED, reply, sizeof(FUSE_lutdesc_message_t)));
349  }
350 
351  delete lc;
352 }
353 
354 /** Process image list request message.
355  * @param m received message
356  */
357 void
359 {
361 
363  SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
364  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
365 
366  while (i != endi) {
368  dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
369  if (ih) {
370  ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
371  }
372 
373  ++i;
374  }
375 
376  delete h;
377 
378  outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
379 }
380 
381 /** Process LUT list request message.
382  * @param m received message
383  */
384 void
386 {
388 
390  SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
391  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
392 
393  while (i != endi) {
395  dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
396  if (lh) {
397  llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
398  }
399 
400  ++i;
401  }
402 
403  delete h;
404 
405  outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
406 }
407 
408 /** Process inbound messages. */
409 void
410 FuseServerClientThread::process_inbound()
411 {
412  inbound_queue_->lock();
413  while (!inbound_queue_->empty()) {
414  FuseNetworkMessage *m = inbound_queue_->front();
415 
416  try {
417  switch (m->type()) {
418  case FUSE_MT_GREETING: process_greeting_message(m); break;
419  case FUSE_MT_GET_IMAGE: process_getimage_message(m); break;
420  case FUSE_MT_GET_IMAGE_INFO: process_getimageinfo_message(m); break;
421  case FUSE_MT_GET_IMAGE_LIST: process_getimagelist_message(m); break;
422  case FUSE_MT_GET_LUT_LIST: process_getlutlist_message(m); break;
423  case FUSE_MT_GET_LUT: process_getlut_message(m); break;
424  case FUSE_MT_SET_LUT: process_setlut_message(m); break;
425  default: throw Exception("Unknown message type received\n");
426  }
427  } catch (Exception &e) {
428  e.append("FUSE protocol error");
429  LibLogger::log_warn("FuseServerClientThread", e);
430  fuse_server_->connection_died(this);
431  alive_ = false;
432  }
433 
434  m->unref();
435  inbound_queue_->pop();
436  }
437  inbound_queue_->unlock();
438 }
439 
440 void
442 {
443  if (!alive_) {
444  usleep(10000);
445  return;
446  }
447 
448  short p = 0;
449  try {
450  p = socket_->poll(10); // block for up to 10 ms
451  } catch (InterruptedException &e) {
452  // we just ignore this and try it again
453  return;
454  }
455 
456  if ((p & Socket::POLL_ERR) || (p & Socket::POLL_HUP) || (p & Socket::POLL_RDHUP)) {
457  fuse_server_->connection_died(this);
458  alive_ = false;
459  } else if (p & Socket::POLL_IN) {
460  try {
461  // Data can be read
462  recv();
463  process_inbound();
464  } catch (...) {
465  fuse_server_->connection_died(this);
466  alive_ = false;
467  }
468  }
469 
470  if (alive_) {
471  send();
472  }
473 }
474 
475 } // end namespace firevision
void process_setlut_message(FuseNetworkMessage *m)
Process LUT setting.
FireVision FUSE protocol server.
Definition: fuse_server.h:43
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:157
virtual void close()
Close socket.
Definition: socket.cpp:311
void process_getlut_message(FuseNetworkMessage *m)
Process LUT request message.
void connection_died(FuseServerClientThread *client)
Connection died.
Shared memory image buffer header.
Definition: shm_image.h:66
const char * lut_id() const
Get LUT ID.
Definition: shm_lut.cpp:486
FUSE lookup table content.
void process_greeting_message(FuseNetworkMessage *m)
Process greeting message.
unsigned int depth() const
Depth of LUT.
Image request message.
Definition: fuse.h:147
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
virtual void set_compression_destination(ImageCompressor::CompressionDestination cd)
Set compression destination.
void unlock() const
Unlock list.
Definition: lock_queue.h:128
Image info message.
Definition: fuse.h:167
Fawkes library namespace.
virtual size_t compressed_size()
Get compressed size.
Shared Memory iterator.
Definition: shm.h:118
unsigned int height() const
Get LUT height.
Definition: shm_lut.cpp:453
virtual void compress()
Compress image.
unsigned int width() const
Width of LUT.
Shared memory lookup table header.
Definition: shm_lut.h:48
void process_getimagelist_message(FuseNetworkMessage *m)
Process image list request message.
unsigned char * buffer() const
Get buffer.
Image description message.
Definition: fuse.h:155
Thread class encapsulation of pthreads.
Definition: thread.h:45
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
TCP stream socket over IP.
Definition: stream.h:31
Jpeg image compressor.
uint32_t width
width in pixels
Definition: fuse.h:172
void add_lutinfo(const char *lut_id, unsigned int width, unsigned int height, unsigned int depth, unsigned int bytes_per_cell)
Add LUT info.
unsigned int height() const
Get height.
Definition: shm_image.cpp:826
const char * lut_id() const
Get LUT ID.
void * payload() const
Get pointer to payload.
uint32_t colorspace
color space
Definition: fuse.h:170
unsigned int width() const
Get LUT width.
Definition: shm_lut.cpp:442
virtual void set_image_buffer(colorspace_t cspace, unsigned char *buffer)
Set image buffer to compress.
MT * msgc() const
Get correctly parsed output.
Definition: fuse_message.h:108
uint32_t version
version from FUSE_version_t
Definition: fuse.h:99
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:169
unsigned int height() const
Height of LUT.
FUSE Network Message.
Definition: fuse_message.h:39
uint32_t type() const
Get message type.
void process_getimageinfo_message(FuseNetworkMessage *m)
Process image info request message.
Base class for exceptions in Fawkes.
Definition: exception.h:35
unsigned int bytes_per_cell() const
Get bytes per cell.
Definition: shm_lut.cpp:475
const char * image_id() const
Get image number.
Definition: shm_image.cpp:838
FUSE lookup table list content.
version packet, bi-directional
Definition: fuse.h:97
unsigned int bytes_per_cell() const
Bytes per cell in LUT.
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
Shared memory image buffer.
Definition: shm_image.h:183
virtual void set_image_dimensions(unsigned int width, unsigned int height)
Set dimensions of image to compress.
virtual void set_destination_buffer(unsigned char *buf, unsigned int buf_size)
Set destination buffer (if compressing to memory).
void send()
Send all messages in outbound queue.
void process_getimage_message(FuseNetworkMessage *m)
Process image request message.
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:149
The current system call has been interrupted (for instance by a signal).
Definition: system.h:38
write compressed image to buffer in memory
void add_imageinfo(const char *image_id, colorspace_t colorspace, unsigned int pixel_width, unsigned int pixel_height)
Add image info.
unsigned int width() const
Get width.
Definition: shm_image.cpp:814
void process_getlutlist_message(FuseNetworkMessage *m)
Process LUT list request message.
colorspace_t colorspace() const
Get colorspace.
Definition: shm_image.cpp:802
Thrown if the connection died during an operation.
Definition: exceptions.h:31
char lut_id[LUT_ID_MAX_LENGTH]
LUT ID.
Definition: fuse.h:163
uint32_t format
requested image format, see FUSE_image_format_t
Definition: fuse.h:150
uint32_t height
height in pixels
Definition: fuse.h:173
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
void lock() const
Lock queue.
Definition: lock_queue.h:114
uint32_t buffer_size
size of following image buffer in bytes
Definition: fuse.h:174
virtual size_t recommended_compressed_buffer_size()
Get the recommended size for the compressed buffer.
virtual void loop()
Code to execute in the thread.
MT * msg() const
Get correctly casted payload.
Definition: fuse_message.h:67
Shared memory lookup table.
Definition: shm_lut.h:112
void append(const char *format,...)
Append messages to the message list.
Definition: exception.cpp:333
LUT description message.
Definition: fuse.h:161
size_t payload_size() const
Get payload size.
unsigned int depth() const
Get LUT depth.
Definition: shm_lut.cpp:464