00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _Connector_
00022 #define _Connector_
00023
00024
00025 #include "qpid/framing/InputHandler.h"
00026 #include "qpid/framing/OutputHandler.h"
00027 #include "qpid/framing/InitiationHandler.h"
00028 #include "qpid/framing/ProtocolInitiation.h"
00029 #include "qpid/framing/ProtocolVersion.h"
00030 #include "qpid/sys/ShutdownHandler.h"
00031 #include "qpid/sys/TimeoutHandler.h"
00032 #include "qpid/sys/Thread.h"
00033 #include "qpid/sys/Monitor.h"
00034 #include "qpid/sys/Socket.h"
00035
00036 namespace qpid {
00037
00038 namespace client {
00039
00040 class Connector : public framing::OutputHandler,
00041 private sys::Runnable
00042 {
00043 const bool debug;
00044 const int receive_buffer_size;
00045 const int send_buffer_size;
00046 framing::ProtocolVersion version;
00047
00048 bool closed;
00049 sys::Mutex closedLock;
00050
00051 int64_t lastIn;
00052 int64_t lastOut;
00053 int64_t timeout;
00054 uint32_t idleIn;
00055 uint32_t idleOut;
00056
00057 sys::TimeoutHandler* timeoutHandler;
00058 sys::ShutdownHandler* shutdownHandler;
00059 framing::InputHandler* input;
00060 framing::InitiationHandler* initialiser;
00061 framing::OutputHandler* output;
00062
00063 framing::Buffer inbuf;
00064 framing::Buffer outbuf;
00065
00066 sys::Mutex writeLock;
00067 sys::Thread receiver;
00068
00069 sys::Socket socket;
00070
00071 void checkIdle(ssize_t status);
00072 void writeBlock(framing::AMQDataBlock* data);
00073 void writeToSocket(char* data, size_t available);
00074 void setSocketTimeout();
00075
00076 void run();
00077 void handleClosed();
00078 bool markClosed();
00079
00080 friend class Channel;
00081 public:
00082 Connector(framing::ProtocolVersion pVersion,
00083 bool debug = false, uint32_t buffer_size = 1024);
00084 virtual ~Connector();
00085 virtual void connect(const std::string& host, int port);
00086 virtual void init();
00087 virtual void close();
00088 virtual void setInputHandler(framing::InputHandler* handler);
00089 virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
00090 virtual void setShutdownHandler(sys::ShutdownHandler* handler);
00091 virtual framing::OutputHandler* getOutputHandler();
00092 virtual void send(framing::AMQFrame* frame);
00093 virtual void setReadTimeout(uint16_t timeout);
00094 virtual void setWriteTimeout(uint16_t timeout);
00095 };
00096
00097 }}
00098
00099
00100 #endif