00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _ConnectionHandler_
00022 #define _ConnectionHandler_
00023
00024 #include "ChainableFrameHandler.h"
00025 #include "ConnectionSettings.h"
00026 #include "Sasl.h"
00027 #include "StateManager.h"
00028 #include "qpid/framing/AMQMethodBody.h"
00029 #include "qpid/framing/AMQP_HighestVersion.h"
00030 #include "qpid/framing/AMQP_ClientOperations.h"
00031 #include "qpid/framing/AMQP_ServerProxy.h"
00032 #include "qpid/framing/Array.h"
00033 #include "qpid/framing/enum.h"
00034 #include "qpid/framing/FieldTable.h"
00035 #include "qpid/framing/FrameHandler.h"
00036 #include "qpid/framing/InputHandler.h"
00037 #include "qpid/sys/SecurityLayer.h"
00038 #include "qpid/sys/Timer.h"
00039 #include "qpid/Url.h"
00040 #include <memory>
00041
00042 namespace qpid {
00043 namespace client {
00044
00045 class ConnectionHandler : private StateManager,
00046 public ConnectionSettings,
00047 public ChainableFrameHandler,
00048 public framing::InputHandler,
00049 private framing::AMQP_ClientOperations::ConnectionHandler
00050 {
00051 typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
00052 enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
00053 std::set<int> ESTABLISHED, FINISHED;
00054
00055 class Adapter : public framing::FrameHandler
00056 {
00057 ConnectionHandler& handler;
00058 public:
00059 Adapter(ConnectionHandler& h) : handler(h) {}
00060 void handle(framing::AMQFrame& f) { handler.out(f); }
00061 };
00062
00063 Adapter outHandler;
00064 framing::AMQP_ServerProxy::Connection proxy;
00065 framing::connection::CloseCode errorCode;
00066 std::string errorText;
00067 bool insist;
00068 framing::ProtocolVersion version;
00069 framing::Array capabilities;
00070 framing::FieldTable properties;
00071 std::auto_ptr<Sasl> sasl;
00072 std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
00073 boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask;
00074
00075 void checkState(STATES s, const std::string& msg);
00076
00077
00078 void start(const framing::FieldTable& serverProperties,
00079 const framing::Array& mechanisms,
00080 const framing::Array& locales);
00081 void secure(const std::string& challenge);
00082 void tune(uint16_t channelMax,
00083 uint16_t frameMax,
00084 uint16_t heartbeatMin,
00085 uint16_t heartbeatMax);
00086 void openOk(const framing::Array& knownHosts);
00087 void redirect(const std::string& host,
00088 const framing::Array& knownHosts);
00089 void close(uint16_t replyCode, const std::string& replyText);
00090 void closeOk();
00091 void heartbeat();
00092
00093 public:
00094 using InputHandler::handle;
00095 typedef boost::function<void()> CloseListener;
00096 typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
00097
00098 ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
00099
00100 void received(framing::AMQFrame& f) { incoming(f); }
00101
00102 void incoming(framing::AMQFrame& frame);
00103 void outgoing(framing::AMQFrame& frame);
00104
00105 void waitForOpen();
00106 void close();
00107 void fail(const std::string& message);
00108
00109
00110 bool isOpen() const;
00111 bool isClosed() const;
00112 bool isClosing() const;
00113
00114 std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer();
00115 void setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask>);
00116
00117 CloseListener onClose;
00118 ErrorListener onError;
00119
00120 std::vector<Url> knownBrokersUrls;
00121
00122 static framing::connection::CloseCode convert(uint16_t replyCode);
00123 };
00124
00125 }}
00126
00127 #endif