00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "router.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "protostructs.h"
00026 #include "usbwrap.h"
00027 #include "endian.h"
00028 #include "debug.h"
00029 #include <unistd.h>
00030
00031 namespace Barry {
00032
00033
00034
00035
00036 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count)
00037 : m_dev(0)
00038 , m_writeEp(0)
00039 , m_readEp(0)
00040 , m_interest(false)
00041 , m_continue_reading(false)
00042 {
00043 pthread_mutex_init(&m_mutex, NULL);
00044
00045 pthread_mutex_init(&m_readwaitMutex, NULL);
00046 pthread_cond_init(&m_readwaitCond, NULL);
00047
00048 AllocateBuffers(prealloc_buffer_count);
00049 }
00050
00051 SocketRoutingQueue::~SocketRoutingQueue()
00052 {
00053
00054 if( m_continue_reading ) {
00055 m_continue_reading = false;
00056 pthread_join(m_usb_read_thread, NULL);
00057 }
00058 }
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 void SocketRoutingQueue::ReturnBuffer(Data *buf)
00070 {
00071
00072 m_free.push(buf);
00073 }
00074
00075
00076
00077
00078
00079
00080 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
00081 {
00082 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
00083
00084
00085 std::string msg;
00086 while( q->m_continue_reading ) {
00087 if( !q->DoRead(msg, 1000) ) {
00088 eout("Error in SimpleReadThread: " << msg);
00089 }
00090 }
00091 return 0;
00092 }
00093
00094
00095
00096
00097
00098
00099
00100
00101 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp)
00102 {
00103 scoped_lock lock(m_mutex);
00104 m_dev = dev;
00105 m_writeEp = writeEp;
00106 m_readEp = readEp;
00107 }
00108
00109 void SocketRoutingQueue::ClearUsbDevice()
00110 {
00111 scoped_lock lock(m_mutex);
00112 m_dev = 0;
00113 lock.unlock();
00114
00115
00116
00117 scoped_lock wait(m_readwaitMutex);
00118 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
00119 }
00120
00121 bool SocketRoutingQueue::UsbDeviceReady()
00122 {
00123 scoped_lock lock(m_mutex);
00124 return m_dev != 0;
00125 }
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136 void SocketRoutingQueue::AllocateBuffers(int count)
00137 {
00138 int todo = count - m_free.size();
00139
00140 for( int i = 0; i < todo; i++ ) {
00141
00142 m_free.push( new Data );
00143 }
00144 }
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
00158 {
00159 DataHandle buf = DefaultRead(timeout);
00160 if( !buf.get() )
00161 return false;
00162
00163
00164 receive = *buf.get();
00165 return true;
00166 }
00167
00168
00169
00170
00171 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
00172 {
00173
00174 Data *buf = m_default.wait_pop(timeout);
00175 return DataHandle(*this, buf);
00176 }
00177
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196 void SocketRoutingQueue::RegisterInterest(SocketId socket,
00197 SocketDataHandler handler,
00198 void *context)
00199 {
00200
00201 scoped_lock lock(m_mutex);
00202
00203 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00204 if( qi != m_socketQueues.end() )
00205 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
00206
00207 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler, context) );
00208 m_interest = true;
00209 }
00210
00211
00212
00213
00214
00215
00216
00217
00218 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
00219 {
00220
00221 scoped_lock lock(m_mutex);
00222
00223 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00224 if( qi == m_socketQueues.end() )
00225 return;
00226
00227
00228 m_free.append_from( qi->second->m_queue );
00229
00230
00231 m_socketQueues.erase( qi );
00232
00233
00234 m_interest = m_socketQueues.size() > 0;
00235 }
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
00255 {
00256 DataHandle buf = SocketRead(socket, timeout);
00257 if( !buf.get() )
00258 return false;
00259
00260
00261 receive = *buf.get();
00262 return true;
00263 }
00264
00265
00266
00267
00268
00269
00270
00271 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
00272 {
00273 QueueEntryPtr qep;
00274 DataQueue *dq = 0;
00275
00276
00277 {
00278 scoped_lock lock(m_mutex);
00279 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00280 if( qi == m_socketQueues.end() )
00281 throw std::logic_error("SocketRead requested data from unregistered socket.");
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294 qep = qi->second;
00295 dq = &qep->m_queue;
00296 }
00297
00298
00299 Data *buf = dq->wait_pop(timeout);
00300
00301
00302
00303 {
00304 scoped_lock lock(m_mutex);
00305 qep.reset();
00306 }
00307
00308 return DataHandle(*this, buf);
00309 }
00310
00311
00312 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
00313 {
00314 scoped_lock lock(m_mutex);
00315 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
00316 if( qi == m_socketQueues.end() )
00317 return false;
00318 return qi->second->m_queue.size() > 0;
00319 }
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 bool SocketRoutingQueue::DoRead(std::string &msg, int timeout)
00337 {
00338 class ReadWaitSignal
00339 {
00340 pthread_mutex_t &m_Mutex;
00341 pthread_cond_t &m_Cond;
00342 public:
00343 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
00344 : m_Mutex(mut), m_Cond(cond)
00345 {}
00346 ~ReadWaitSignal()
00347 {
00348 scoped_lock wait(m_Mutex);
00349 pthread_cond_signal(&m_Cond);
00350 }
00351 } readwait(m_readwaitMutex, m_readwaitCond);
00352
00353 Usb::Device * volatile dev = 0;
00354 int readEp;
00355 DataHandle buf(*this, 0);
00356
00357
00358 {
00359 scoped_lock lock(m_mutex);
00360
00361 if( !m_dev ) {
00362 lock.unlock();
00363
00364
00365 usleep(125000);
00366 return true;
00367 }
00368
00369 dev = m_dev;
00370 readEp = m_readEp;
00371
00372
00373 Data *raw = m_free.pop();
00374 if( !raw )
00375 buf = DataHandle(*this, new Data);
00376 else
00377 buf = DataHandle(*this, raw);
00378 }
00379
00380
00381
00382 try {
00383
00384 Data &data = *buf.get();
00385
00386 if( !dev->BulkRead(readEp, data, timeout) )
00387 return true;
00388
00389 MAKE_PACKET(pack, data);
00390
00391
00392 if( data.GetSize() < sizeof(pack->socket) )
00393 return true;
00394
00395
00396 uint16_t socket = btohs(pack->socket);
00397
00398
00399
00400 scoped_lock lock(m_mutex);
00401
00402
00403 if( m_interest ) {
00404 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00405 if( qi != m_socketQueues.end() ) {
00406 SocketDataHandler sdh = qi->second->m_handler;
00407 void *ctx = qi->second->m_context;
00408
00409
00410 if( sdh ) {
00411
00412 lock.unlock();
00413 (*sdh)(ctx, buf.get());
00414 return true;
00415 }
00416 else {
00417 qi->second->m_queue.push(buf.release());
00418 return true;
00419 }
00420 }
00421
00422
00423 }
00424
00425
00426 lock.unlock();
00427
00428
00429 m_default.push(buf.release());
00430 return true;
00431
00432 }
00433 catch( Usb::Timeout & ) {
00434
00435 }
00436 catch( Usb::Error &ue ) {
00437
00438
00439 msg = ue.what();
00440 return false;
00441 }
00442
00443 return true;
00444 }
00445
00446 void SocketRoutingQueue::SpinoffSimpleReadThread()
00447 {
00448
00449 if( m_continue_reading )
00450 return;
00451 m_continue_reading = true;
00452
00453
00454 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
00455 if( ret ) {
00456 m_continue_reading = false;
00457 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
00458 }
00459 }
00460
00461 }
00462