router.cc
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "router.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "protostructs.h"
00026 #include "usbwrap.h"
00027 #include "endian.h"
00028 #include "debug.h"
00029 #include <unistd.h>
00030
00031 namespace Barry {
00032
00033
00034
00035
00036 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
00037 {
00038
00039 eout("SocketDataHandler: Error: " << error.what());
00040 (void) error;
00041 }
00042
00043 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
00044 {
00045
00046 }
00047
00048
00049
00050
00051 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count,
00052 int default_read_timeout)
00053 : m_dev(0)
00054 , m_writeEp(0)
00055 , m_readEp(0)
00056 , m_interest(false)
00057 , m_seen_usb_error(false)
00058 , m_timeout(default_read_timeout)
00059 , m_continue_reading(false)
00060 {
00061 pthread_mutex_init(&m_mutex, NULL);
00062
00063 pthread_mutex_init(&m_readwaitMutex, NULL);
00064 pthread_cond_init(&m_readwaitCond, NULL);
00065
00066 AllocateBuffers(prealloc_buffer_count);
00067 }
00068
00069 SocketRoutingQueue::~SocketRoutingQueue()
00070 {
00071
00072 if( m_continue_reading ) {
00073 m_continue_reading = false;
00074 pthread_join(m_usb_read_thread, NULL);
00075 }
00076 }
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087 void SocketRoutingQueue::ReturnBuffer(Data *buf)
00088 {
00089
00090 m_free.push(buf);
00091 }
00092
00093
00094
00095
00096
00097
00098 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
00099 {
00100 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
00101
00102
00103 q->m_seen_usb_error = false;
00104 while( q->m_continue_reading ) {
00105 try {
00106 q->DoRead(1000);
00107 }
00108 catch (std::runtime_error const &e) {
00109 eout("SimpleReadThread received uncaught exception: " << typeid(e).name() << " what: " << e.what());
00110 }
00111 catch (...) {
00112 eout("SimpleReadThread recevied uncaught exception of unknown type");
00113 }
00114 }
00115 return 0;
00116 }
00117
00118
00119
00120
00121
00122
00123
00124
00125 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
00126 SocketDataHandlerPtr callback)
00127 {
00128 scoped_lock lock(m_mutex);
00129 m_dev = dev;
00130 m_usb_error_dev_callback = callback;
00131 m_writeEp = writeEp;
00132 m_readEp = readEp;
00133 }
00134
00135 void SocketRoutingQueue::ClearUsbDevice()
00136 {
00137 scoped_lock lock(m_mutex);
00138 m_dev = 0;
00139 m_usb_error_dev_callback.reset();
00140 lock.unlock();
00141
00142
00143
00144 scoped_lock wait(m_readwaitMutex);
00145 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
00146 }
00147
00148 bool SocketRoutingQueue::UsbDeviceReady()
00149 {
00150 scoped_lock lock(m_mutex);
00151 return m_dev != 0 && !m_seen_usb_error;
00152 }
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163 void SocketRoutingQueue::AllocateBuffers(int count)
00164 {
00165 int todo = count - m_free.size();
00166
00167 for( int i = 0; i < todo; i++ ) {
00168
00169 m_free.push( new Data );
00170 }
00171 }
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
00185 {
00186 DataHandle buf = DefaultRead(timeout);
00187 if( !buf.get() )
00188 return false;
00189
00190
00191 receive = *buf.get();
00192 return true;
00193 }
00194
00195
00196
00197
00198 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
00199 {
00200 if( m_seen_usb_error && timeout == -1 ) {
00201
00202
00203
00204
00205
00206 timeout = 0;
00207 }
00208
00209
00210
00211 Data *buf = m_default.wait_pop(timeout == -1 ? m_timeout : timeout);
00212 return DataHandle(*this, buf);
00213 }
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 void SocketRoutingQueue::RegisterInterest(SocketId socket,
00234 SocketDataHandlerPtr handler)
00235 {
00236
00237 scoped_lock lock(m_mutex);
00238
00239 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00240 if( qi != m_socketQueues.end() )
00241 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
00242
00243 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
00244 m_interest = true;
00245 }
00246
00247
00248
00249
00250
00251
00252
00253
00254 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
00255 {
00256
00257 scoped_lock lock(m_mutex);
00258
00259 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00260 if( qi == m_socketQueues.end() )
00261 return;
00262
00263
00264 m_free.append_from( qi->second->m_queue );
00265
00266
00267 m_socketQueues.erase( qi );
00268
00269
00270 m_interest = m_socketQueues.size() > 0;
00271 }
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
00291 {
00292 DataHandle buf = SocketRead(socket, timeout);
00293 if( !buf.get() )
00294 return false;
00295
00296
00297 receive = *buf.get();
00298 return true;
00299 }
00300
00301
00302
00303
00304
00305
00306
00307 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
00308 {
00309 QueueEntryPtr qep;
00310 DataQueue *dq = 0;
00311
00312
00313 {
00314 scoped_lock lock(m_mutex);
00315 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00316 if( qi == m_socketQueues.end() )
00317 throw std::logic_error("SocketRead requested data from unregistered socket.");
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330 qep = qi->second;
00331 dq = &qep->m_queue;
00332 }
00333
00334
00335
00336 Data *buf = dq->wait_pop(timeout == -1 ? m_timeout : timeout);
00337
00338
00339
00340 {
00341 scoped_lock lock(m_mutex);
00342 qep.reset();
00343 }
00344
00345 return DataHandle(*this, buf);
00346 }
00347
00348
00349 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
00350 {
00351 scoped_lock lock(m_mutex);
00352 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
00353 if( qi == m_socketQueues.end() )
00354 return false;
00355 return qi->second->m_queue.size() > 0;
00356 }
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 void SocketRoutingQueue::DoRead(int timeout)
00372 {
00373 class ReadWaitSignal
00374 {
00375 pthread_mutex_t &m_Mutex;
00376 pthread_cond_t &m_Cond;
00377 public:
00378 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
00379 : m_Mutex(mut), m_Cond(cond)
00380 {}
00381 ~ReadWaitSignal()
00382 {
00383 scoped_lock wait(m_Mutex);
00384 pthread_cond_signal(&m_Cond);
00385 }
00386 } readwait(m_readwaitMutex, m_readwaitCond);
00387
00388 Usb::Device * volatile dev = 0;
00389 int readEp;
00390 DataHandle buf(*this, 0);
00391
00392
00393 {
00394 scoped_lock lock(m_mutex);
00395
00396 if( !m_dev || m_seen_usb_error ) {
00397 lock.unlock();
00398
00399
00400 usleep(125000);
00401 return;
00402 }
00403
00404 dev = m_dev;
00405 readEp = m_readEp;
00406
00407
00408 Data *raw = m_free.pop();
00409 if( !raw )
00410 buf = DataHandle(*this, new Data);
00411 else
00412 buf = DataHandle(*this, raw);
00413 }
00414
00415
00416
00417 try {
00418
00419 Data &data = *buf.get();
00420
00421 if( !dev->BulkRead(readEp, data, timeout) )
00422 return;
00423
00424 MAKE_PACKET(pack, data);
00425
00426
00427 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
00428 return;
00429
00430
00431 uint16_t socket = btohs(pack->socket);
00432
00433
00434
00435 scoped_lock lock(m_mutex);
00436
00437
00438 if( m_interest ) {
00439 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00440 if( qi != m_socketQueues.end() ) {
00441 SocketDataHandlerPtr &sdh = qi->second->m_handler;
00442
00443
00444 if( sdh ) {
00445
00446 lock.unlock();
00447 sdh->DataReceived(*buf.get());
00448 return;
00449 }
00450 else {
00451 qi->second->m_queue.push(buf.release());
00452 return;
00453 }
00454 }
00455
00456
00457 }
00458
00459
00460 lock.unlock();
00461
00462
00463 m_default.push(buf.release());
00464 }
00465 catch( Usb::Timeout & ) {
00466
00467 }
00468 catch( Usb::Error &ue ) {
00469
00470
00471 m_seen_usb_error = true;
00472
00473
00474
00475
00476
00477
00478 scoped_lock lock(m_mutex);
00479 std::vector<SocketDataHandlerPtr> handlers;
00480 SocketQueueMap::iterator qi = m_socketQueues.begin();
00481 while( qi != m_socketQueues.end() ) {
00482 SocketDataHandlerPtr &sdh = qi->second->m_handler;
00483
00484 if( sdh ) {
00485 handlers.push_back(sdh);
00486 }
00487 ++qi;
00488 }
00489
00490 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
00491
00492 lock.unlock();
00493 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
00494 while( hi != handlers.end() ) {
00495 (*hi)->Error(ue);
00496 ++hi;
00497 }
00498
00499
00500 if( usb_error_handler.get() ) {
00501 usb_error_handler->Error(ue);
00502 }
00503 }
00504 }
00505
00506 void SocketRoutingQueue::SpinoffSimpleReadThread()
00507 {
00508
00509 if( m_continue_reading )
00510 return;
00511 m_continue_reading = true;
00512
00513
00514 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
00515 if( ret ) {
00516 m_continue_reading = false;
00517 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
00518 }
00519 }
00520
00521 }
00522