00001 #ifndef _broker_BrokerAdapter_h
00002 #define _broker_BrokerAdapter_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "qpid/framing/AMQP_ServerOperations.h"
00022 #include "HandlerImpl.h"
00023 #include "MessageHandlerImpl.h"
00024 #include "qpid/Exception.h"
00025
00026 namespace qpid {
00027 namespace broker {
00028
00029 class Channel;
00030 class Connection;
00031 class Broker;
00032 class ChannelHandler;
00033 class ConnectionHandler;
00034 class BasicHandler;
00035 class ExchangeHandler;
00036 class QueueHandler;
00037 class TxHandler;
00038 class MessageHandler;
00039 class AccessHandler;
00040 class FileHandler;
00041 class StreamHandler;
00042 class DtxHandler;
00043 class TunnelHandler;
00044 class MessageHandlerImpl;
00045
00055 class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
00056 {
00057 public:
00058 BrokerAdapter(Channel& ch, Connection& c, Broker& b);
00059
00060 framing::ProtocolVersion getVersion() const;
00061 ChannelHandler* getChannelHandler() { return &channelHandler; }
00062 ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
00063 BasicHandler* getBasicHandler() { return &basicHandler; }
00064 ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
00065 QueueHandler* getQueueHandler() { return &queueHandler; }
00066 TxHandler* getTxHandler() { return &txHandler; }
00067 MessageHandler* getMessageHandler() { return &messageHandler; }
00068 AccessHandler* getAccessHandler() {
00069 throw ConnectionException(540, "Access class not implemented"); }
00070 FileHandler* getFileHandler() {
00071 throw ConnectionException(540, "File class not implemented"); }
00072 StreamHandler* getStreamHandler() {
00073 throw ConnectionException(540, "Stream class not implemented"); }
00074 DtxHandler* getDtxHandler() {
00075 throw ConnectionException(540, "Dtx class not implemented"); }
00076 TunnelHandler* getTunnelHandler() {
00077 throw ConnectionException(540, "Tunnel class not implemented"); }
00078
00079 framing::AMQP_ClientProxy& getProxy() { return proxy; }
00080
00081 private:
00082
00083 class ConnectionHandlerImpl :
00084 public ConnectionHandler,
00085 public HandlerImpl<framing::AMQP_ClientProxy::Connection>
00086 {
00087 public:
00088 ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
00089
00090 void startOk(const framing::MethodContext& context,
00091 const qpid::framing::FieldTable& clientProperties,
00092 const std::string& mechanism, const std::string& response,
00093 const std::string& locale);
00094 void secureOk(const framing::MethodContext& context,
00095 const std::string& response);
00096 void tuneOk(const framing::MethodContext& context,
00097 uint16_t channelMax,
00098 uint32_t frameMax, uint16_t heartbeat);
00099 void open(const framing::MethodContext& context,
00100 const std::string& virtualHost,
00101 const std::string& capabilities, bool insist);
00102 void close(const framing::MethodContext& context, uint16_t replyCode,
00103 const std::string& replyText,
00104 uint16_t classId, uint16_t methodId);
00105 void closeOk(const framing::MethodContext& context);
00106 };
00107
00108 class ChannelHandlerImpl :
00109 public ChannelHandler,
00110 public HandlerImpl<framing::AMQP_ClientProxy::Channel>
00111 {
00112 public:
00113 ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
00114
00115 void open(const framing::MethodContext& context, const std::string& outOfBand);
00116 void flow(const framing::MethodContext& context, bool active);
00117 void flowOk(const framing::MethodContext& context, bool active);
00118 void ok( const framing::MethodContext& context );
00119 void ping( const framing::MethodContext& context );
00120 void pong( const framing::MethodContext& context );
00121 void resume( const framing::MethodContext& context, const std::string& channelId );
00122 void close(const framing::MethodContext& context, uint16_t replyCode, const
00123 std::string& replyText, uint16_t classId, uint16_t methodId);
00124 void closeOk(const framing::MethodContext& context);
00125 };
00126
00127 class ExchangeHandlerImpl :
00128 public ExchangeHandler,
00129 public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
00130 {
00131 public:
00132 ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
00133
00134 void declare(const framing::MethodContext& context, uint16_t ticket,
00135 const std::string& exchange, const std::string& type,
00136 bool passive, bool durable, bool autoDelete,
00137 bool internal, bool nowait,
00138 const qpid::framing::FieldTable& arguments);
00139 void delete_(const framing::MethodContext& context, uint16_t ticket,
00140 const std::string& exchange, bool ifUnused, bool nowait);
00141 };
00142
00143 class QueueHandlerImpl :
00144 public QueueHandler,
00145 public HandlerImpl<framing::AMQP_ClientProxy::Queue>
00146 {
00147 public:
00148 QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
00149
00150 void declare(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
00151 bool passive, bool durable, bool exclusive,
00152 bool autoDelete, bool nowait,
00153 const qpid::framing::FieldTable& arguments);
00154 void bind(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
00155 const std::string& exchange, const std::string& routingKey,
00156 bool nowait, const qpid::framing::FieldTable& arguments);
00157 void unbind(const framing::MethodContext& context,
00158 uint16_t ticket,
00159 const std::string& queue,
00160 const std::string& exchange,
00161 const std::string& routingKey,
00162 const qpid::framing::FieldTable& arguments );
00163 void purge(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
00164 bool nowait);
00165 void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
00166 bool ifUnused, bool ifEmpty,
00167 bool nowait);
00168 };
00169
00170 class BasicHandlerImpl :
00171 public BasicHandler,
00172 public HandlerImpl<framing::AMQP_ClientProxy::Basic>
00173 {
00174 public:
00175 BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
00176
00177 void qos(const framing::MethodContext& context, uint32_t prefetchSize,
00178 uint16_t prefetchCount, bool global);
00179 void consume(
00180 const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
00181 const std::string& consumerTag, bool noLocal, bool noAck,
00182 bool exclusive, bool nowait,
00183 const qpid::framing::FieldTable& fields);
00184 void cancel(const framing::MethodContext& context, const std::string& consumerTag,
00185 bool nowait);
00186 void publish(const framing::MethodContext& context, uint16_t ticket,
00187 const std::string& exchange, const std::string& routingKey,
00188 bool mandatory, bool immediate);
00189 void get(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
00190 bool noAck);
00191 void ack(const framing::MethodContext& context, uint64_t deliveryTag, bool multiple);
00192 void reject(const framing::MethodContext& context, uint64_t deliveryTag, bool requeue);
00193 void recover(const framing::MethodContext& context, bool requeue);
00194 };
00195
00196 class TxHandlerImpl :
00197 public TxHandler,
00198 public HandlerImpl<framing::AMQP_ClientProxy::Tx>
00199 {
00200 public:
00201 TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
00202
00203 void select(const framing::MethodContext& context);
00204 void commit(const framing::MethodContext& context);
00205 void rollback(const framing::MethodContext& context);
00206 };
00207
00208 Connection& connection;
00209 BasicHandlerImpl basicHandler;
00210 ChannelHandlerImpl channelHandler;
00211 ConnectionHandlerImpl connectionHandler;
00212 ExchangeHandlerImpl exchangeHandler;
00213 MessageHandlerImpl messageHandler;
00214 QueueHandlerImpl queueHandler;
00215 TxHandlerImpl txHandler;
00216
00217 };
00218 }}
00219
00220
00221
00222 #endif