router.cc

Go to the documentation of this file.
00001 ///
00002 /// \file       router.cc
00003 ///             Support classes for the pluggable socket routing system.
00004 ///
00005 
00006 /*
00007     Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
00008 
00009     This program is free software; you can redistribute it and/or modify
00010     it under the terms of the GNU General Public License as published by
00011     the Free Software Foundation; either version 2 of the License, or
00012     (at your option) any later version.
00013 
00014     This program is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
00017 
00018     See the GNU General Public License in the COPYING file at the
00019     root directory of this project for more details.
00020 */
00021 
00022 #include "router.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "protostructs.h"
00026 #include "usbwrap.h"
00027 #include "endian.h"
00028 #include "debug.h"
00029 #include <unistd.h>
00030 
00031 namespace Barry {
00032 
00033 ///////////////////////////////////////////////////////////////////////////////
00034 // SocketRoutingQueue constructors
00035 
00036 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count)
00037         : m_dev(0)
00038         , m_writeEp(0)
00039         , m_readEp(0)
00040         , m_interest(false)
00041         , m_continue_reading(false)
00042 {
00043         pthread_mutex_init(&m_mutex, NULL);
00044 
00045         pthread_mutex_init(&m_readwaitMutex, NULL);
00046         pthread_cond_init(&m_readwaitCond, NULL);
00047 
00048         AllocateBuffers(prealloc_buffer_count);
00049 }
00050 
00051 SocketRoutingQueue::~SocketRoutingQueue()
00052 {
00053         // thread running?
00054         if( m_continue_reading ) {
00055                 m_continue_reading = false;
00056                 pthread_join(m_usb_read_thread, NULL);
00057         }
00058 }
00059 
00060 ///////////////////////////////////////////////////////////////////////////////
00061 // protected members
00062 
00063 //
00064 // ReturnBuffer
00065 //
00066 /// Provides a method of returning a buffer to the free queue
00067 /// after processing.  The DataHandle class calls this automatically
00068 /// from its destructor.
00069 void SocketRoutingQueue::ReturnBuffer(Data *buf)
00070 {
00071         // don't need to lock here, since m_free handles its own locking
00072         m_free.push(buf);
00073 }
00074 
00075 //
00076 // SimpleReadThread()
00077 //
00078 /// Convenience thread to handle USB read activity.
00079 ///
00080 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
00081 {
00082         SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
00083 
00084         // read from USB and write to stdout until finished
00085         std::string msg;
00086         while( q->m_continue_reading ) {
00087                 if( !q->DoRead(msg, 1000) ) {   // timeout in milliseconds
00088                         eout("Error in SimpleReadThread: " << msg);
00089                 }
00090         }
00091         return 0;
00092 }
00093 
00094 
00095 ///////////////////////////////////////////////////////////////////////////////
00096 // public API
00097 
00098 // These functions connect the router to an external Usb::Device
00099 // object.  Normally this is handled automatically by the
00100 // Controller class, but are public here in case they are needed.
00101 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp)
00102 {
00103         scoped_lock lock(m_mutex);
00104         m_dev = dev;
00105         m_writeEp = writeEp;
00106         m_readEp = readEp;
00107 }
00108 
00109 void SocketRoutingQueue::ClearUsbDevice()
00110 {
00111         scoped_lock lock(m_mutex);
00112         m_dev = 0;
00113         lock.unlock();
00114 
00115         // wait for the DoRead cycle to finish, so the external
00116         // Usb::Device object doesn't close before we're done with it
00117         scoped_lock wait(m_readwaitMutex);
00118         pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
00119 }
00120 
00121 bool SocketRoutingQueue::UsbDeviceReady()
00122 {
00123         scoped_lock lock(m_mutex);
00124         return m_dev != 0;
00125 }
00126 
00127 //
00128 // AllocateBuffers
00129 //
00130 /// This class starts out with no buffers, and will grow one buffer
00131 /// at a time if needed.  Call this to allocate count buffers
00132 /// all at once and place them on the free queue.  After calling
00133 /// this function, at least count buffers will exist in the free
00134 /// queue.  If there are already count buffers, none will be added.
00135 ///
00136 void SocketRoutingQueue::AllocateBuffers(int count)
00137 {
00138         int todo = count - m_free.size();
00139 
00140         for( int i = 0; i < todo; i++ ) {
00141                 // m_free handles its own locking
00142                 m_free.push( new Data );
00143         }
00144 }
00145 
00146 //
00147 // DefaultRead (both variations)
00148 //
00149 /// Returns the data for the next unregistered socket.
00150 /// Blocks until timeout or data is available.
00151 /// Returns false (or null pointer) on timeout and no data.
00152 /// With the return version of the function, there is no
00153 /// copying performed.
00154 ///
00155 /// This version performs a copy.
00156 ///
00157 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
00158 {
00159         DataHandle buf = DefaultRead(timeout);
00160         if( !buf.get() )
00161                 return false;
00162 
00163         // copy to desired buffer
00164         receive = *buf.get();
00165         return true;
00166 }
00167 
00168 ///
00169 /// This version does not perform a copy.
00170 ///
00171 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
00172 {
00173         // m_default handles its own locking
00174         Data *buf = m_default.wait_pop(timeout);
00175         return DataHandle(*this, buf);
00176 }
00177 
00178 //
00179 // RegisterInterest
00180 //
00181 /// Register an interest in data from a certain socket.  To read
00182 /// from that socket, use the SocketRead() function from then on.
00183 ///
00184 /// Any non-registered socket goes in the default queue
00185 /// and must be read by DefaultRead()
00186 ///
00187 /// If not null, handler is called when new data is read.  It will
00188 /// be called in the same thread instance that DoRead() is called from.
00189 /// Handler is passed the DataQueue Data pointer, and so no
00190 /// copying is done.  Once the handler returns, the data is
00191 /// considered processed and not added to the interested queue,
00192 /// but instead returned to m_free.
00193 ///
00194 /// Throws std::logic_error if already registered.
00195 ///
00196 void SocketRoutingQueue::RegisterInterest(SocketId socket,
00197                                           SocketDataHandler handler,
00198                                           void *context)
00199 {
00200         // modifying our own std::map, need a lock
00201         scoped_lock lock(m_mutex);
00202 
00203         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00204         if( qi != m_socketQueues.end() )
00205                 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
00206 
00207         m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler, context) );
00208         m_interest = true;
00209 }
00210 
00211 //
00212 // UnregisterInterest
00213 //
00214 /// Unregisters interest in data from the given socket, and discards
00215 /// any existing data in its interest queue.  Any new incoming data
00216 /// for this socket will be placed in the default queue.
00217 ///
00218 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
00219 {
00220         // modifying our own std::map, need a lock
00221         scoped_lock lock(m_mutex);
00222 
00223         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00224         if( qi == m_socketQueues.end() )
00225                 return; // nothing registered, done
00226 
00227         // salvage all our data buffers
00228         m_free.append_from( qi->second->m_queue );
00229 
00230         // remove the QueueEntryPtr from the map
00231         m_socketQueues.erase( qi );
00232 
00233         // check the interest flag
00234         m_interest = m_socketQueues.size() > 0;
00235 }
00236 
00237 //
00238 // SocketRead
00239 //
00240 /// Reads data from the interested socket cache.  Can only read
00241 /// from sockets that have been previously registered.
00242 ///
00243 /// Blocks until timeout or data is available.
00244 ///
00245 /// Returns false (or null pointer) on timeout and no data.
00246 /// With the return version of the function, there is no
00247 /// copying performed.
00248 ///
00249 /// Throws std::logic_error if a socket was requested that was
00250 /// not previously registered.
00251 ///
00252 /// Copying is performed with this function.
00253 ///
00254 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
00255 {
00256         DataHandle buf = SocketRead(socket, timeout);
00257         if( !buf.get() )
00258                 return false;
00259 
00260         // copy to desired buffer
00261         receive = *buf.get();
00262         return true;
00263 }
00264 
00265 ///
00266 /// Copying is not performed with this function.
00267 ///
00268 /// Throws std::logic_error if a socket was requested that was
00269 /// not previously registered.
00270 ///
00271 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
00272 {
00273         QueueEntryPtr qep;
00274         DataQueue *dq = 0;
00275 
00276         // accessing our own std::map, need a lock
00277         {
00278                 scoped_lock lock(m_mutex);
00279                 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00280                 if( qi == m_socketQueues.end() )
00281                         throw std::logic_error("SocketRead requested data from unregistered socket.");
00282 
00283                 // got our queue, save the whole QueueEntryPtr (shared_ptr),
00284                 // and unlock, since we will be waiting on the DataQueue,
00285                 // not the socketQueues map
00286                 //
00287                 // This is safe, since even if UnregisterInterest is called,
00288                 // our pointer won't be deleted until our shared_ptr
00289                 // (QueueEntryPtr) goes out of scope.
00290                 //
00291                 // The remaining problem is that wait_pop() might wait
00292                 // forever if there is no timeout... c'est la vie.
00293                 // Should'a used a timeout. :-)
00294                 qep = qi->second;
00295                 dq = &qep->m_queue;
00296         }
00297 
00298         // get data from DataQueue
00299         Data *buf = dq->wait_pop(timeout);
00300 
00301         // specifically delete our copy of shared pointer, in a locked
00302         // environment
00303         {
00304                 scoped_lock lock(m_mutex);
00305                 qep.reset();
00306         }
00307 
00308         return DataHandle(*this, buf);
00309 }
00310 
00311 // Returns true if data is available for that socket.
00312 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
00313 {
00314         scoped_lock lock(m_mutex);
00315         SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
00316         if( qi == m_socketQueues.end() )
00317                 return false;
00318         return qi->second->m_queue.size() > 0;
00319 }
00320 
00321 //
00322 // DoRead
00323 //
00324 /// Called by the application's "read thread" to read the next usb
00325 /// packet and route it to the correct queue.  Returns after every
00326 /// read, even if a handler is associated with a queue.
00327 /// Note: this function is safe to call before SetUsbDevice() is
00328 /// called... it just doesn't do anything if there is no usb
00329 /// device to work with.
00330 ///
00331 /// Timeout is in milliseconds.
00332 ///
00333 /// Returns false in the case of USB errors and puts the error message
00334 /// in msg.
00335 ///
00336 bool SocketRoutingQueue::DoRead(std::string &msg, int timeout)
00337 {
00338         class ReadWaitSignal
00339         {
00340                 pthread_mutex_t &m_Mutex;
00341                 pthread_cond_t &m_Cond;
00342         public:
00343                 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
00344                         : m_Mutex(mut), m_Cond(cond)
00345                         {}
00346                 ~ReadWaitSignal()
00347                 {
00348                         scoped_lock wait(m_Mutex);
00349                         pthread_cond_signal(&m_Cond);
00350                 }
00351         } readwait(m_readwaitMutex, m_readwaitCond);
00352 
00353         Usb::Device * volatile dev = 0;
00354         int readEp;
00355         DataHandle buf(*this, 0);
00356 
00357         // if we are not connected to a USB device yet, just wait
00358         {
00359                 scoped_lock lock(m_mutex);
00360 
00361                 if( !m_dev ) {
00362                         lock.unlock();  // unlock early, since we're sleeping
00363                         // sleep only a short time, since things could be
00364                         // in the process of setup or teardown
00365                         usleep(125000);
00366                         return true;
00367                 }
00368 
00369                 dev = m_dev;
00370                 readEp = m_readEp;
00371 
00372                 // fetch a free buffer
00373                 Data *raw = m_free.pop();
00374                 if( !raw )
00375                         buf = DataHandle(*this, new Data);
00376                 else
00377                         buf = DataHandle(*this, raw);
00378         }
00379 
00380         // take a chance and do the read unlocked, as this has the potential
00381         // for blocking for a while
00382         try {
00383 
00384                 Data &data = *buf.get();
00385 
00386                 if( !dev->BulkRead(readEp, data, timeout) )
00387                         return true;    // no data, done!
00388 
00389                 MAKE_PACKET(pack, data);
00390 
00391                 // make sure the size is right
00392                 if( data.GetSize() < sizeof(pack->socket) )
00393                         return true;    // bad size, just skip
00394 
00395                 // extract the socket from the packet
00396                 uint16_t socket = btohs(pack->socket);
00397 
00398                 // we have data, now lock up again to place it
00399                 // in the right queue
00400                 scoped_lock lock(m_mutex);
00401 
00402                 // search for registration of socket
00403                 if( m_interest ) {
00404                         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
00405                         if( qi != m_socketQueues.end() ) {
00406                                 SocketDataHandler sdh = qi->second->m_handler;
00407                                 void *ctx = qi->second->m_context;
00408 
00409                                 // is there a handler?
00410                                 if( sdh ) {
00411                                         // unlock & let the handler process it
00412                                         lock.unlock();
00413                                         (*sdh)(ctx, buf.get());
00414                                         return true;
00415                                 }
00416                                 else {
00417                                         qi->second->m_queue.push(buf.release());
00418                                         return true;
00419                                 }
00420                         }
00421 
00422                         // fall through
00423                 }
00424 
00425                 // safe to unlock now, we are done with the map
00426                 lock.unlock();
00427 
00428                 // if we get here, send to default queue
00429                 m_default.push(buf.release());
00430                 return true;
00431 
00432         }
00433         catch( Usb::Timeout & ) {
00434                 // this is expected... just ignore
00435         }
00436         catch( Usb::Error &ue ) {
00437                 // this is unexpected, but we're in a thread here...
00438                 // return false and the caller decide how to handle it
00439                 msg = ue.what();
00440                 return false;
00441         }
00442 
00443         return true;
00444 }
00445 
00446 void SocketRoutingQueue::SpinoffSimpleReadThread()
00447 {
00448         // signal that it's ok to run inside the thread
00449         if( m_continue_reading )
00450                 return; // already running
00451         m_continue_reading = true;
00452 
00453         // Start USB read thread, to handle all routing
00454         int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
00455         if( ret ) {
00456                 m_continue_reading = false;
00457                 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
00458         }
00459 }
00460 
00461 } // namespace Barry
00462 

Generated on 29 Mar 2010 for Barry by  doxygen 1.6.1