00001 #ifndef _broker_BrokerMessage_h
00002 #define _broker_BrokerMessage_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <memory>
00026 #include <boost/shared_ptr.hpp>
00027
00028 #include "BrokerMessageBase.h"
00029 #include "qpid/framing/BasicHeaderProperties.h"
00030 #include "ConnectionToken.h"
00031 #include "Content.h"
00032 #include "qpid/sys/Mutex.h"
00033 #include "TxBuffer.h"
00034
00035 namespace qpid {
00036
00037 namespace framing {
00038 class MethodContext;
00039 class ChannelAdapter;
00040 class AMQHeaderBody;
00041 }
00042
00043 namespace broker {
00044
00045 class MessageStore;
00046 using framing::string;
00047
00053 class BasicMessage : public Message {
00054 boost::shared_ptr<framing::AMQHeaderBody> header;
00055 std::auto_ptr<Content> content;
00056 mutable sys::Mutex contentLock;
00057 uint64_t size;
00058
00059 void sendContent(framing::ChannelAdapter&, uint32_t framesize);
00060
00061 public:
00062 typedef boost::shared_ptr<BasicMessage> shared_ptr;
00063
00064 BasicMessage(const ConnectionToken* const publisher,
00065 const string& exchange, const string& routingKey,
00066 bool mandatory, bool immediate,
00067 boost::shared_ptr<framing::AMQMethodBody> respondTo);
00068 BasicMessage();
00069 ~BasicMessage();
00070 void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
00071 void addContent(framing::AMQContentBody::shared_ptr data);
00072 bool isComplete();
00073
00074 void deliver(framing::ChannelAdapter&,
00075 const string& consumerTag,
00076 uint64_t deliveryTag,
00077 uint32_t framesize);
00078
00079 void sendGetOk(const framing::MethodContext&,
00080 const std::string& destination,
00081 uint32_t messageCount,
00082 uint64_t deliveryTag,
00083 uint32_t framesize);
00084
00085 framing::BasicHeaderProperties* getHeaderProperties();
00086 const framing::FieldTable& getApplicationHeaders();
00087 bool isPersistent();
00088 uint64_t contentSize() const { return size; }
00089
00090 void decode(framing::Buffer& buffer, bool headersOnly = false,
00091 uint32_t contentChunkSize = 0);
00092 void decodeHeader(framing::Buffer& buffer);
00093 void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
00094
00095 void encode(framing::Buffer& buffer) const;
00096 void encodeHeader(framing::Buffer& buffer) const;
00097 void encodeContent(framing::Buffer& buffer) const;
00102 uint32_t encodedSize() const;
00108 uint32_t encodedHeaderSize() const;
00113 uint32_t encodedContentSize() const;
00119 void releaseContent(MessageStore* store);
00124 uint64_t expectedContentSize();
00130 void setContent(std::auto_ptr<Content>& content);
00131 };
00132
00133 }
00134 }
00135
00136
00137 #endif