00001 #ifndef _broker_BrokerMessageBase_h
00002 #define _broker_BrokerMessageBase_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <string>
00026 #include <boost/shared_ptr.hpp>
00027 #include "Content.h"
00028 #include "PersistableMessage.h"
00029 #include "qpid/framing/amqp_types.h"
00030
00031 namespace qpid {
00032
00033 namespace framing {
00034 class MethodContext;
00035 class ChannelAdapter;
00036 class BasicHeaderProperties;
00037 class FieldTable;
00038 class AMQMethodBody;
00039 class AMQContentBody;
00040 class AMQHeaderBody;
00041 }
00042
00043
00044 namespace broker {
00045 class ConnectionToken;
00046 class MessageStore;
00047
00053 class Message : public PersistableMessage{
00054 public:
00055 typedef boost::shared_ptr<Message> shared_ptr;
00056 typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
00057
00058
00059 Message(const ConnectionToken* publisher_,
00060 const std::string& _exchange,
00061 const std::string& _routingKey,
00062 bool _mandatory, bool _immediate,
00063 AMQMethodBodyPtr respondTo_) :
00064 publisher(publisher_),
00065 exchange(_exchange),
00066 routingKey(_routingKey),
00067 mandatory(_mandatory),
00068 immediate(_immediate),
00069 persistenceId(0),
00070 redelivered(false),
00071 respondTo(respondTo_)
00072 {}
00073
00074 Message() :
00075 mandatory(false),
00076 immediate(false),
00077 persistenceId(0),
00078 redelivered(false)
00079 {}
00080
00081 virtual ~Message() {};
00082
00083
00084 const std::string& getRoutingKey() const { return routingKey; }
00085 const std::string& getExchange() const { return exchange; }
00086 uint64_t getPersistenceId() const { return persistenceId; }
00087 bool getRedelivered() const { return redelivered; }
00088 AMQMethodBodyPtr getRespondTo() const { return respondTo; }
00089
00090 void setRouting(const std::string& _exchange, const std::string& _routingKey)
00091 { exchange = _exchange; routingKey = _routingKey; }
00092 void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
00093 void redeliver() { redelivered = true; }
00094
00098 virtual void deliver(framing::ChannelAdapter& channel,
00099 const std::string& consumerTag,
00100 uint64_t deliveryTag,
00101 uint32_t framesize) = 0;
00105 virtual void sendGetOk(const framing::MethodContext& context,
00106 const std::string& destination,
00107 uint32_t messageCount,
00108 uint64_t deliveryTag,
00109 uint32_t framesize) = 0;
00110
00111 virtual bool isComplete() = 0;
00112
00113 virtual uint64_t contentSize() const = 0;
00114 virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
00115 virtual const framing::FieldTable& getApplicationHeaders() = 0;
00116 virtual bool isPersistent() = 0;
00117 virtual const ConnectionToken* getPublisher() const {
00118 return publisher;
00119 }
00120
00121 virtual void encode(framing::Buffer& buffer) const = 0;
00122 virtual void encodeHeader(framing::Buffer& buffer) const = 0;
00123
00128 virtual uint32_t encodedSize() const = 0;
00134 virtual uint32_t encodedHeaderSize() const = 0;
00139 virtual uint32_t encodedContentSize() const = 0;
00144 virtual uint64_t expectedContentSize() = 0;
00145
00146 virtual void decodeHeader(framing::Buffer& buffer) = 0;
00147 virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
00148
00149 static shared_ptr decode(framing::Buffer& buffer);
00150
00151
00152
00158 virtual void setContent(std::auto_ptr<Content>& ) {};
00159 virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
00160 virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
00166 virtual void releaseContent(MessageStore* ) {};
00167
00168 private:
00169 const ConnectionToken* publisher;
00170 std::string exchange;
00171 std::string routingKey;
00172 const bool mandatory;
00173 const bool immediate;
00174 mutable uint64_t persistenceId;
00175 bool redelivered;
00176 AMQMethodBodyPtr respondTo;
00177 };
00178
00179 }}
00180
00181
00182 #endif