#include <Reactor.h>
Public Member Functions | |
Reactor () | |
Constructor. | |
~Reactor () | |
Destructor. | |
TimerId | registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>") |
Register Timer Event handler with Reactor. | |
bool | registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS) |
Register I/O Event handler with Reactor. | |
bool | removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS) |
Remove Event handler from reactor for either all I/O events or timeout event or both. | |
bool | removeTimerHandler (TimerId id_) |
Remove Timer event from the queue. | |
bool | removeIOHandler (handler_t fd_) |
Remove IO Event handler from reactor. | |
void | waitForEvents (void) |
Main waiting loop that blocks indefinitely processing events. | |
void | waitForEvents (TimeVal *tv_) |
Wait for events for time specified. | |
void | stopReactor (void) |
Stop Reactor's activity. | |
void | deactivate (void) |
Deactivate Reactor. | |
Private Types | |
typedef std::map< u_int, EventHandler * > | Fd2Eh_Map_Type |
no cloning | |
typedef Fd2Eh_Map_Type::iterator | Fd2Eh_Map_Iter |
Private Member Functions | |
Reactor (const Reactor &) | |
Reactor & | operator= (const Reactor &) |
no cloning | |
void | adjust_maxfdp1 (handler_t fd_) |
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether). | |
bool | handleError (void) |
Handle error in select(2) loop appropriately. | |
bool | dispatch (int minimum_) |
Notify all EventHandlers registered on respecful events occured. | |
int | isAnyReady (void) |
Return number of file descriptors ready accross all sets. | |
bool | checkFDs (void) |
Check mask for bad file descriptors. | |
void | dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_) |
Call handler's callback and, if callback returns negative value, remove it from the Reactor. | |
void | calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_) |
Calculate closest timeout. | |
Private Attributes | |
int | m_fd_setsize |
Max number of open files per process. | |
handler_t | m_maxfd_plus1 |
Max file descriptor number (in all sets) plus 1. | |
bool | m_active |
Flag that indicates whether Reactor is active or had been stopped. | |
Fd2Eh_Map_Type | m_readSet |
Event handlers awaiting on READ_EVENT. | |
Fd2Eh_Map_Type | m_writeSet |
Event handlers awaiting on WRITE_EVENT. | |
Fd2Eh_Map_Type | m_exceptSet |
Event handlers awaiting on EXCEPT_EVENT. | |
MaskSet | m_waitSet |
Handlers to wait for event on. | |
MaskSet | m_readySet |
Handlers that are ready for processing. | |
TimerQueue | m_tqueue |
The queue of Timers. |
Definition at line 57 of file Reactor.h.
typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type [private] |
typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter [private] |
Reactor::Reactor | ( | ) |
Constructor.
Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.
Initialize winsock2 library
Definition at line 24 of file Reactor.cpp.
References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.
00024 : 00025 m_fd_setsize (1024), 00026 m_maxfd_plus1 (0), 00027 m_active (true) 00028 { 00029 trace_with_mask("Reactor::Reactor",REACTTRACE); 00030 00034 #if defined(WIN32) 00035 m_fd_setsize = FD_SETSIZE; 00036 00037 #else // POSIX 00038 struct rlimit rlim; 00039 rlim.rlim_max = 0; 00040 00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) { 00042 m_fd_setsize = rlim.rlim_cur; 00043 } 00044 #endif 00045 00048 #if defined (WIN32) 00049 WSADATA data; 00050 WSAStartup (MAKEWORD (2, 2), &data); 00051 #endif 00052 }
Reactor::~Reactor | ( | ) |
Destructor.
Definition at line 55 of file Reactor.cpp.
References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.
00056 { 00057 trace_with_mask("Reactor::~Reactor",REACTTRACE); 00058 00059 m_readSet.clear (); 00060 m_writeSet.clear (); 00061 m_exceptSet.clear (); 00062 deactivate (); 00063 }
ASSA::Reactor::Reactor | ( | const Reactor & | ) | [private] |
TimerId Reactor::registerTimerHandler | ( | EventHandler * | eh_, | |
const TimeVal & | tv_, | |||
const std::string & | name_ = "<unknown>" | |||
) |
Register Timer Event handler with Reactor.
Reactor will dispatch appropriate callback when event of EventType is received.
eh_ | Pointer to the EventHandler | |
tv_ | Timeout value | |
name_ | Name of the timer |
Definition at line 67 of file Reactor.cpp.
References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().
00070 { 00071 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE); 00072 Assure_return (eh_); 00073 00074 TimeVal now (TimeVal::gettimeofday()); 00075 TimeVal t (now + timeout_); 00076 00077 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n", 00078 timeout_.sec(),timeout_.msec())); 00079 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() )); 00080 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() )); 00081 00082 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_); 00083 00084 DL((REACT,"---Modified Timer Queue----\n")); 00085 m_tqueue.dump(); 00086 DL((REACT,"---------------------------\n")); 00087 00088 return (tid); 00089 }
bool Reactor::registerIOHandler | ( | EventHandler * | eh_, | |
handler_t | fd_, | |||
EventType | et_ = RWE_EVENTS | |||
) |
Register I/O Event handler with Reactor.
Reactor will dispatch appropriate callback when event of EventType is received.
eh_ | Pointer to the EventHandler | |
fd_ | File descriptor | |
et_ | Event Type |
Definition at line 93 of file Reactor.cpp.
References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::ends(), ASSA::EventHandler::get_id(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::RemoteLogger::log_open(), and ASSA::Acceptor< SERVICE_HANDLER, PEER_ACCEPTOR >::open().
00094 { 00095 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE); 00096 00097 std::ostringstream msg; 00098 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_)); 00099 00100 if (isReadEvent (et_)) 00101 { 00102 if (!m_waitSet.m_rset.setFd (fd_)) 00103 { 00104 DL((ASSAERR,"readset: fd %d out of range\n", fd_)); 00105 return (false); 00106 } 00107 m_readSet[fd_] = eh_; 00108 msg << "READ_EVENT"; 00109 } 00110 00111 if (isWriteEvent (et_)) 00112 { 00113 if (!m_waitSet.m_wset.setFd (fd_)) 00114 { 00115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_)); 00116 return (false); 00117 } 00118 m_writeSet[fd_] = eh_; 00119 msg << " WRITE_EVENT"; 00120 } 00121 00122 if (isExceptEvent (et_)) 00123 { 00124 if (!m_waitSet.m_eset.setFd (fd_)) 00125 { 00126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_)); 00127 return (false); 00128 } 00129 m_exceptSet[fd_] = eh_; 00130 msg << " EXCEPT_EVENT"; 00131 } 00132 msg << std::ends; 00133 00134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 00135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () )); 00136 00137 #if !defined (WIN32) 00138 if (m_maxfd_plus1 < fd_+1) { 00139 m_maxfd_plus1 = fd_+1; 00140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00141 } 00142 #endif 00143 00144 DL((REACT,"Modified waitSet:\n")); 00145 m_waitSet.dump (); 00146 00147 return (true); 00148 }
bool Reactor::removeHandler | ( | EventHandler * | eh_, | |
EventType | et_ = ALL_EVENTS | |||
) |
Remove Event handler from reactor for either all I/O events or timeout event or both.
Remove handler from all events that matches event_.
If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.
eh_ | Pointer to the EventHandler | |
et_ | Event Type to remove. Default will remove Event Handler for all events. |
Definition at line 173 of file Reactor.cpp.
References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), ASSA::RemoteLogger::log_close(), and stopReactor().
00174 { 00175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE); 00176 00177 bool ret = false; 00178 handler_t fd; 00179 Fd2Eh_Map_Iter iter; 00180 00181 if (eh_ == NULL) { 00182 return false; 00183 } 00184 00185 if (isTimeoutEvent (event_)) { 00186 ret = m_tqueue.remove (eh_); 00187 ret = true; 00188 } 00189 00190 if (isReadEvent (event_)) { 00191 iter = m_readSet.begin (); 00192 while (iter != m_readSet.end ()) { 00193 if ((*iter).second == eh_) { 00194 fd = (*iter).first; 00195 m_readSet.erase (iter); 00196 m_waitSet.m_rset.clear (fd); 00197 ret = true; 00198 break; 00199 } 00200 iter++; 00201 } 00202 } 00203 00204 if (isWriteEvent (event_)) { 00205 iter = m_writeSet.begin (); 00206 while (iter != m_writeSet.end ()) { 00207 if ((*iter).second == eh_) { 00208 fd = (*iter).first; 00209 m_writeSet.erase (iter); 00210 m_waitSet.m_wset.clear (fd); 00211 ret = true; 00212 break; 00213 } 00214 iter++; 00215 } 00216 } 00217 00218 if (isExceptEvent (event_)) { 00219 iter = m_exceptSet.begin (); 00220 while (iter != m_exceptSet.end ()) { 00221 if ((*iter).second == eh_) { 00222 fd = (*iter).first; 00223 m_exceptSet.erase (iter); 00224 m_waitSet.m_eset.clear (fd); 00225 ret = true; 00226 break; 00227 } 00228 iter++; 00229 } 00230 } 00231 00232 if (ret == true) { 00233 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_)); 00234 eh_->handle_close (fd); 00235 } 00236 00237 adjust_maxfdp1 (fd); 00238 00239 DL((REACT,"Modifies waitSet:\n")); 00240 m_waitSet.dump (); 00241 00242 return (ret); 00243 }
bool Reactor::removeTimerHandler | ( | TimerId | id_ | ) |
Remove Timer event from the queue.
This removes particular event.
id_ | Timer Id returned by registerTimer. |
Definition at line 152 of file Reactor.cpp.
References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write().
00153 { 00154 trace_with_mask("Reactor::removeTimer",REACTTRACE); 00155 bool ret; 00156 00157 if ((ret = m_tqueue.remove (tid_))) { 00158 DL((REACT,"---Modified Timer Queue----\n")); 00159 m_tqueue.dump(); 00160 DL((REACT,"---------------------------\n")); 00161 } 00162 else { 00163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ )); 00164 } 00165 return (ret); 00166 }
bool Reactor::removeIOHandler | ( | handler_t | fd_ | ) |
Remove IO Event handler from reactor.
This will remove handler from receiving all I/O events.
fd_ | File descriptor |
We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().
Definition at line 247 of file Reactor.cpp.
References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by checkFDs(), and dispatchHandler().
00248 { 00249 trace_with_mask("Reactor::removeIOHandler",REACTTRACE); 00250 00251 bool ret = false; 00252 EventHandler* ehp = NULL; 00253 Fd2Eh_Map_Iter iter; 00254 00255 Assure_return (ASSA::is_valid_handler (fd_)); 00256 00257 DL((REACT,"Removing handler for fd=%d\n",fd_)); 00258 00263 if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 00264 { 00265 ehp = (*iter).second; 00266 m_readSet.erase (iter); 00267 m_waitSet.m_rset.clear (fd_); 00268 m_readySet.m_rset.clear (fd_); 00269 if (m_readSet.size () > 0) { 00270 iter = m_readSet.end (); 00271 iter--; 00272 } 00273 ret = true; 00274 } 00275 00276 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 00277 { 00278 ehp = (*iter).second; 00279 m_writeSet.erase (iter); 00280 m_waitSet.m_wset.clear (fd_); 00281 m_readySet.m_wset.clear (fd_); 00282 if (m_writeSet.size () > 0) { 00283 iter = m_writeSet.end (); 00284 iter--; 00285 } 00286 ret = true; 00287 } 00288 00289 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 00290 { 00291 ehp = (*iter).second; 00292 m_exceptSet.erase (iter); 00293 m_waitSet.m_eset.clear (fd_); 00294 m_readySet.m_eset.clear (fd_); 00295 if (m_exceptSet.size () > 0) { 00296 iter = m_exceptSet.end (); 00297 iter--; 00298 } 00299 ret = true; 00300 } 00301 00302 if (ret == true && ehp != NULL) { 00303 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp)); 00304 ehp->handle_close (fd_); 00305 } 00306 00307 adjust_maxfdp1 (fd_); 00308 00309 DL((REACT,"Modifies waitSet:\n")); 00310 m_waitSet.dump (); 00311 00312 return (ret); 00313 }
void Reactor::waitForEvents | ( | void | ) |
Main waiting loop that blocks indefinitely processing events.
Block forever version.
Definition at line 470 of file Reactor.cpp.
References m_active.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().
00471 { 00472 while ( m_active ) { 00473 waitForEvents ((TimeVal*) NULL); 00474 } 00475 }
void Reactor::waitForEvents | ( | TimeVal * | tv_ | ) |
Wait for events for time specified.
===================================================================== | select() | errno | Events | Behavior | |===================================================================| | < 0 | EINTR | Interrup by signal | Retry | +----------+-------+---------------------+--------------------------+ | < 0 | EBADF | Bad file descriptor | Remove bad fds and retry | | | | | and retry | +----------+-------+---------------------+--------------------------+ | < 0 | others| Some other error | Fall through | +----------+-------+---------------------+--------------------------+ | == 0 | 0 | Timed out | Fall through | +----------+-------+---------------------+--------------------------+ | > 0 | 0 | Got some work to do | Fall through | +-------------------------------------------------------------------+
Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.
tv_ | [RW] is time to wait for. |
Definition at line 495 of file Reactor.cpp.
References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.
00496 { 00497 trace_with_mask("Reactor::waitForEvents",REACTTRACE); 00498 00499 TimerCountdown traceTime (tv_); 00500 DL((REACT,"======================================\n")); 00501 00502 /*--- Expire all stale Timers ---*/ 00503 m_tqueue.expire (TimeVal::gettimeofday ()); 00504 00505 /* Test to see if Reactor has been deactivated as a result 00506 * of processing done by any TimerHandlers. 00507 */ 00508 if (!m_active) { 00509 return; 00510 } 00511 00512 int nReady; 00513 TimeVal delay; 00514 TimeVal* dlp = &delay; 00515 00516 /*--- 00517 In case if not all data have been processed by the EventHandler, 00518 and EventHandler stated so in its callback's return value 00519 to dispatcher (), it will be called again. This way 00520 underlying file/socket stream can efficiently utilize its 00521 buffering mechaninsm. 00522 ---*/ 00523 if ((nReady = isAnyReady ())) { 00524 DL((REACT,"isAnyReady returned: %d\n",nReady)); 00525 dispatch (nReady); 00526 return; 00527 } 00528 00529 DL((REACT,"=== m_waitSet ===\n")); 00530 m_waitSet.dump (); 00531 00532 do { 00533 m_readySet.reset (); 00534 DL ((REACT,"m_readySet after reset():\n")); 00535 m_readySet.dump (); 00536 00537 m_readySet = m_waitSet; 00538 DL ((REACT,"m_readySet after assign:\n")); 00539 m_readySet.dump (); 00540 00541 calculateTimeout (dlp, tv_); 00542 00543 nReady = ::select (m_maxfd_plus1, 00544 &m_readySet.m_rset, 00545 &m_readySet.m_wset, 00546 &m_readySet.m_eset, 00547 dlp); 00548 DL((REACT,"::select() returned: %d\n",nReady)); 00549 00550 m_readySet.sync (); 00551 DL ((REACT,"m_readySet after select:\n")); 00552 m_readySet.dump (); 00553 00554 } 00555 while (nReady < 0 && handleError ()); 00556 00557 dispatch (nReady); 00558 }
void Reactor::stopReactor | ( | void | ) |
Stop Reactor's activity.
This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.
Definition at line 667 of file Reactor.cpp.
References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.
00668 { 00669 trace_with_mask("Reactor::stopReactor", REACTTRACE); 00670 00671 m_active = false; 00672 00673 Fd2Eh_Map_Iter iter; 00674 EventHandler* ehp; 00675 00676 while (m_readSet.size () > 0) { 00677 iter = m_readSet.begin (); 00678 ehp = (*iter).second; 00679 removeHandler (ehp); 00680 } 00681 00682 while (m_writeSet.size () > 0) { 00683 iter = m_writeSet.begin (); 00684 ehp = (*iter).second; 00685 removeHandler (ehp); 00686 } 00687 00688 while (m_exceptSet.size () > 0) { 00689 iter = m_exceptSet.begin (); 00690 ehp = (*iter).second; 00691 removeHandler (ehp); 00692 } 00693 }
void ASSA::Reactor::deactivate | ( | void | ) | [inline] |
Deactivate Reactor.
This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.
Definition at line 234 of file Reactor.h.
References m_active.
Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().
00234 { m_active = false; }
void Reactor::adjust_maxfdp1 | ( | handler_t | fd_ | ) | [private] |
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
If the socket descriptor that has just been eliminated was the maxfd+1, we readjust to the next highest.
Win32 implementation of select() ignores this value altogether.
Definition at line 701 of file Reactor.cpp.
References DL, m_maxfd_plus1, m_waitSet, ASSA::MaskSet::max_fd(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by removeHandler(), and removeIOHandler().
00702 { 00703 #if !defined (WIN32) /* POSIX */ 00704 00705 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE); 00706 00707 if (m_maxfd_plus1 == fd_ + 1) 00708 { 00709 m_maxfd_plus1 = m_waitSet.max_fd () + 1; 00710 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00711 } 00712 #endif 00713 }
bool Reactor::handleError | ( | void | ) | [private] |
Handle error in select(2) loop appropriately.
If commanded to stop, do so
Definition at line 341 of file Reactor.cpp.
References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by waitForEvents().
00342 { 00343 trace_with_mask("Reactor::handleError",REACTTRACE); 00344 00347 if ( !m_active ) { 00348 DL((REACT,"Received cmd to stop Reactor\n")); 00349 return (false); 00350 } 00351 00352 /*--- 00353 TODO: If select(2) returns before time expires, with 00354 a descriptor ready or with EINTR, timeval is not 00355 going to be updated with number of seconds remaining. 00356 This is true for all systems except Linux, which will 00357 do so. Therefore, to restart correctly in case of 00358 EINTR, we ought to take time measurement before and 00359 after select, and try to select() for remaining time. 00360 00361 For now, we restart with the initial timing value. 00362 ---*/ 00363 /*--- 00364 BSD kernel never restarts select(2). SVR4 will restart if 00365 the SA_RESTART flag is specified when the signal handler 00366 for the signal delivered is installed. This means taht for 00367 portability, we must handle signal interrupts. 00368 ---*/ 00369 00370 if ( errno == EINTR ) { 00371 EL((REACT,"EINTR: interrupted select(2)\n")); 00372 /* 00373 If I was sitting in select(2) and received SIGTERM, 00374 the signal handler would have set m_active to 'false', 00375 and this function would have returned 'false' as above. 00376 For any other non-critical signals (USR1,...), 00377 we retry select. 00378 */ 00379 return (true); 00380 } 00381 /* 00382 EBADF - bad file number. One of the file descriptors does 00383 not reference an open file to open(), close(), ioctl(). 00384 This can happen if user closed fd and forgot to remove 00385 handler from Reactor. 00386 */ 00387 if ( errno == EBADF ) { 00388 DL((REACT,"EBADF: bad file descriptor\n")); 00389 return (checkFDs ()); 00390 } 00391 /* 00392 Any other error from select 00393 */ 00394 #if defined (WIN32) 00395 DL ((REACT,"select(3) error = %d\n", WSAGetLastError())); 00396 #else 00397 EL((ASSAERR,"select(3) error\n")); 00398 #endif 00399 return (false); 00400 }
bool Reactor::dispatch | ( | int | minimum_ | ) | [private] |
Notify all EventHandlers registered on respecful events occured.
Many UNIX systems will count a particular file descriptor in the ready_ only ONCE, even if it was flagged by select(2) in, say, both read and write masks.
minimum_ | number of file descriptors ready. |
Definition at line 626 of file Reactor.cpp.
References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by waitForEvents().
00627 { 00628 trace_with_mask("Reactor::dispatch", REACTTRACE); 00629 00630 m_tqueue.expire (TimeVal::gettimeofday ()); 00631 00632 if ( ready_ < 0 ) 00633 { 00634 #if !defined (WIN32) 00635 EL((ASSAERR,"::select(3) error\n")); 00636 #endif 00637 return (false); 00638 } 00639 if ( ready_ == 0 ) { 00640 return (true); 00641 } 00642 00643 DL((REACT,"Dispatching %d FDs.\n",ready_)); 00644 DL((REACT,"m_readySet:\n")); 00645 m_readySet.dump (); 00646 00647 /*--- Writes first ---*/ 00648 dispatchHandler (m_readySet.m_wset, 00649 m_writeSet, 00650 &EventHandler::handle_write); 00651 00652 /*--- Exceptions next ---*/ 00653 dispatchHandler (m_readySet.m_eset, 00654 m_exceptSet, 00655 &EventHandler::handle_except); 00656 00657 /*--- Finally, the Reads ---*/ 00658 dispatchHandler (m_readySet.m_rset, 00659 m_readSet, 00660 &EventHandler::handle_read); 00661 00662 return (true); 00663 }
int Reactor::isAnyReady | ( | void | ) | [private] |
Return number of file descriptors ready accross all sets.
Definition at line 404 of file Reactor.cpp.
References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by waitForEvents().
00405 { 00406 trace_with_mask("Reactor::isAnyReady",REACTTRACE); 00407 00408 int n = m_readySet.m_rset.numSet () + 00409 m_readySet.m_wset.numSet () + 00410 m_readySet.m_eset.numSet (); 00411 00412 if ( n > 0 ) { 00413 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n)); 00414 m_readySet.dump (); 00415 } 00416 return (n); 00417 }
bool Reactor::checkFDs | ( | void | ) | [private] |
Check mask for bad file descriptors.
Definition at line 317 of file Reactor.cpp.
References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.
Referenced by handleError().
00318 { 00319 trace_with_mask("Reactor::checkFDs",REACTTRACE); 00320 00321 bool num_removed = false; 00322 FdSet mask; 00323 timeval poll = { 0, 0 }; 00324 00325 for (handler_t fd = 0; fd < m_fd_setsize; fd++) { 00326 if ( m_readSet[fd] != NULL ) { 00327 mask.setFd (fd); 00328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) { 00329 removeIOHandler (fd); 00330 num_removed = true; 00331 DL((REACT,"Detected BAD FD: %d\n", fd )); 00332 } 00333 mask.clear (fd); 00334 } 00335 } 00336 return (num_removed); 00337 }
void Reactor::dispatchHandler | ( | FdSet & | mask_, | |
Fd2Eh_Map_Type & | fdSet_, | |||
EH_IO_Callback | callback_ | |||
) | [private] |
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
This spot needs re-thinking.
When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.
WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).
Definition at line 568 of file Reactor.cpp.
References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.
Referenced by dispatch().
00569 { 00570 trace_with_mask("Reactor::dispatchHandler",REACTTRACE); 00571 00572 int ret = 0; 00573 handler_t fd; 00574 EventHandler* ehp = NULL; 00575 std::string eh_id; 00576 00577 Fd2Eh_Map_Iter iter = fdSet_.begin (); 00578 00579 while (iter != fdSet_.end ()) 00580 { 00581 fd = (*iter).first; 00582 ehp = (*iter).second; 00583 00584 if (mask_.isSet (fd) && ehp != NULL) 00585 { 00586 eh_id = ehp->get_id (); 00587 DL((REACT,"Data detected from \"%s\"(fd=%d)\n", 00588 eh_id.c_str (), fd)); 00589 00590 ret = (ehp->*callback_) (fd); /* Fire up a callback */ 00591 00592 if (ret == -1) { 00593 removeIOHandler (fd); 00594 } 00595 else if (ret > 0) { 00596 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n", 00597 ret, fd, eh_id.c_str ())); 00598 //return; <-- would starve other connections 00599 } 00600 else { 00601 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 00602 eh_id.c_str (), fd)); 00603 mask_.clear (fd); 00604 } 00611 iter = fdSet_.begin (); 00612 } 00613 else { 00614 iter++; 00615 } 00616 } 00617 }
Calculate closest timeout.
If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.
maxwait_ | (in) how long we are expected to wait for event(s). | |
howlong_ | (out) how long we are going to wait. |
Definition at line 421 of file Reactor.cpp.
References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().
Referenced by waitForEvents().
00422 { 00423 trace_with_mask("Reactor::calculateTimeout",REACTTRACE); 00424 00425 TimeVal now; 00426 TimeVal tv; 00427 00428 if (m_tqueue.isEmpty () ) { 00429 howlong_ = maxwait_; 00430 goto done; 00431 } 00432 now = TimeVal::gettimeofday (); 00433 tv = m_tqueue.top (); 00434 00435 if (tv < now) { 00436 /*--- 00437 It took too long to get here (fraction of a millisecond), 00438 and top timer had already expired. In this case, 00439 perform non-blocking select in order to drain the timer queue. 00440 ---*/ 00441 *howlong_ = 0; 00442 } 00443 else { 00444 DL((REACT,"--------- Timer Queue ----------\n")); 00445 m_tqueue.dump(); 00446 DL((REACT,"--------------------------------\n")); 00447 00448 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) { 00449 *howlong_ = tv - now; 00450 } 00451 else { 00452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now; 00453 } 00454 } 00455 00456 done: 00457 if (howlong_ != NULL) { 00458 DL((REACT,"delay (%f)\n", double (*howlong_) )); 00459 } 00460 else { 00461 DL((REACT,"delay (forever)\n")); 00462 } 00463 }
int ASSA::Reactor::m_fd_setsize [private] |
Max number of open files per process.
This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.
Definition at line 200 of file Reactor.h.
Referenced by checkFDs(), and Reactor().
handler_t ASSA::Reactor::m_maxfd_plus1 [private] |
Max file descriptor number (in all sets) plus 1.
This value is ignored by WIN32 implementation of select()
Definition at line 206 of file Reactor.h.
Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().
bool ASSA::Reactor::m_active [private] |
Flag that indicates whether Reactor is active or had been stopped.
Definition at line 209 of file Reactor.h.
Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().
Fd2Eh_Map_Type ASSA::Reactor::m_readSet [private] |
Event handlers awaiting on READ_EVENT.
Definition at line 212 of file Reactor.h.
Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().
Fd2Eh_Map_Type ASSA::Reactor::m_writeSet [private] |
Event handlers awaiting on WRITE_EVENT.
Definition at line 215 of file Reactor.h.
Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().
Fd2Eh_Map_Type ASSA::Reactor::m_exceptSet [private] |
Event handlers awaiting on EXCEPT_EVENT.
Definition at line 218 of file Reactor.h.
Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().
MaskSet ASSA::Reactor::m_waitSet [private] |
Handlers to wait for event on.
Definition at line 221 of file Reactor.h.
Referenced by adjust_maxfdp1(), registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().
MaskSet ASSA::Reactor::m_readySet [private] |
Handlers that are ready for processing.
Definition at line 224 of file Reactor.h.
Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().
TimerQueue ASSA::Reactor::m_tqueue [private] |
The queue of Timers.
Definition at line 227 of file Reactor.h.
Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().