00001 #ifndef _client_ClientChannel_h
00002 #define _client_ClientChannel_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <boost/scoped_ptr.hpp>
00025 #include "qpid/framing/amqp_framing.h"
00026 #include "ClientExchange.h"
00027 #include "ClientMessage.h"
00028 #include "ClientQueue.h"
00029 #include "ResponseHandler.h"
00030 #include "qpid/framing/ChannelAdapter.h"
00031 #include "qpid/sys/Thread.h"
00032 #include "AckMode.h"
00033
00034 namespace qpid {
00035
00036 namespace framing {
00037 class ChannelCloseBody;
00038 class AMQMethodBody;
00039 }
00040
00041 namespace client {
00042
00043 class Connection;
00044 class MessageChannel;
00045 class MessageListener;
00046 class ReturnedMessageHandler;
00047
00055 class Channel : public framing::ChannelAdapter
00056 {
00057 private:
00058 struct UnknownMethod {};
00059 typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
00060
00061 sys::Mutex lock;
00062 boost::scoped_ptr<MessageChannel> messaging;
00063 Connection* connection;
00064 sys::Thread dispatcher;
00065 ResponseHandler responses;
00066
00067 uint16_t prefetch;
00068 const bool transactional;
00069 framing::ProtocolVersion version;
00070
00071 void handleHeader(framing::AMQHeaderBody::shared_ptr body);
00072 void handleContent(framing::AMQContentBody::shared_ptr body);
00073 void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
00074 void handleMethodInContext(
00075 framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
00076 void handleChannel(framing::AMQMethodBody::shared_ptr method);
00077 void handleConnection(framing::AMQMethodBody::shared_ptr method);
00078
00079 void setQos();
00080
00081 void protocolInit(
00082 const std::string& uid, const std::string& pwd,
00083 const std::string& vhost);
00084
00085 framing::AMQMethodBody::shared_ptr sendAndReceive(
00086 framing::AMQMethodBody::shared_ptr,
00087 framing::ClassId, framing::MethodId);
00088
00089 framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
00090 bool sync,
00091 framing::AMQMethodBody::shared_ptr,
00092 framing::ClassId, framing::MethodId);
00093
00094 template <class BodyType>
00095 boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
00096 return boost::shared_polymorphic_downcast<BodyType>(
00097 sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
00098 }
00099
00100 template <class BodyType>
00101 boost::shared_ptr<BodyType> sendAndReceiveSync(
00102 bool sync, framing::AMQMethodBody::shared_ptr body) {
00103 return boost::shared_polymorphic_downcast<BodyType>(
00104 sendAndReceiveSync(
00105 sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
00106 }
00107
00108 void open(framing::ChannelId, Connection&);
00109 void closeInternal();
00110 void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
00111
00112
00113 friend class Connection;
00114 friend class BasicMessageChannel;
00115 friend class MessageMessageChannel;
00116
00117 public:
00118 enum InteropMode { AMQP_08, AMQP_09 };
00119
00135 Channel(
00136 bool transactional = false, u_int16_t prefetch = 500,
00137 InteropMode=AMQP_08);
00138
00139 ~Channel();
00140
00155 void declareExchange(Exchange& exchange, bool synch = true);
00164 void deleteExchange(Exchange& exchange, bool synch = true);
00173 void declareQueue(Queue& queue, bool synch = true);
00182 void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
00201 void bind(const Exchange& exchange, const Queue& queue,
00202 const std::string& key, const framing::FieldTable& args,
00203 bool synch = true);
00204
00217 void commit();
00218
00225 void rollback();
00226
00230 void setPrefetch(uint16_t prefetch);
00231
00232 uint16_t getPrefetch() { return prefetch; }
00233
00237 void start();
00238
00243 void close(
00244 framing::ReplyCode = 200, const std::string& ="OK",
00245 framing::ClassId = 0, framing::MethodId = 0);
00246
00248 bool isTransactional() { return transactional; }
00249
00251 bool isOpen() const;
00252
00254 Connection& getConnection() { return *connection; }
00255
00257 framing::ProtocolVersion getVersion() const { return version ; }
00258
00286 void consume(
00287 Queue& queue, std::string& tag, MessageListener* listener,
00288 AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
00289 const framing::FieldTable* fields = 0);
00290
00300 void cancel(const std::string& tag, bool synch = true);
00315 bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
00316
00334 void publish(const Message& msg, const Exchange& exchange,
00335 const std::string& routingKey,
00336 bool mandatory = false, bool immediate = false);
00337
00344 void setReturnedMessageHandler(ReturnedMessageHandler* handler);
00345
00349 void run();
00350
00351
00352 };
00353
00354 }}
00355
00356 #endif