00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _LFProcessor_
00022 #define _LFProcessor_
00023
00024 #include <apr_poll.h>
00025 #include <iostream>
00026 #include <vector>
00027 #include "qpid/sys/Monitor.h"
00028 #include "qpid/sys/Runnable.h"
00029 #include "qpid/sys/Thread.h"
00030
00031 namespace qpid {
00032 namespace sys {
00033
00034 class LFSessionContext;
00035
00042 class LFProcessor : private virtual qpid::sys::Runnable
00043 {
00044 typedef std::vector<LFSessionContext*>::iterator iterator;
00045
00046 const int size;
00047 const apr_interval_time_t timeout;
00048 apr_pollset_t* pollset;
00049 int signalledCount;
00050 int current;
00051 const apr_pollfd_t* signalledFDs;
00052 int count;
00053 const int workerCount;
00054 bool hasLeader;
00055 qpid::sys::Thread* workers;
00056 qpid::sys::Monitor leadLock;
00057 qpid::sys::Mutex countLock;
00058 std::vector<LFSessionContext*> sessions;
00059 volatile bool stopped;
00060
00061 const apr_pollfd_t* getNextEvent();
00062 void waitToLead();
00063 void relinquishLead();
00064 void poll();
00065 virtual void run();
00066
00067 public:
00068 LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
00073 void add(const apr_pollfd_t* const fd);
00077 void remove(const apr_pollfd_t* const fd);
00082 void update(const apr_pollfd_t* const fd);
00086 void reactivate(const apr_pollfd_t* const fd);
00091 void deactivate(const apr_pollfd_t* const fd);
00096 bool full();
00100 bool empty();
00104 void stop();
00108 void start();
00112 bool isStopped();
00113
00114 ~LFProcessor();
00115 };
00116
00117 }
00118 }
00119
00120
00121 #endif