00001 #ifndef _broker_BrokerQueue_h
00002 #define _broker_BrokerQueue_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <vector>
00025 #include <memory>
00026 #include <queue>
00027 #include <boost/shared_ptr.hpp>
00028 #include "qpid/framing/amqp_types.h"
00029 #include "ConnectionToken.h"
00030 #include "Consumer.h"
00031 #include "BrokerMessage.h"
00032 #include "qpid/framing/FieldTable.h"
00033 #include "qpid/sys/Monitor.h"
00034 #include "PersistableQueue.h"
00035 #include "QueuePolicy.h"
00036
00037
00038
00039
00040 namespace qpid {
00041 namespace broker {
00042 class MessageStore;
00043 class QueueRegistry;
00044
00048 using std::string;
00049
00056 class Queue : public PersistableQueue{
00057 typedef std::vector<Consumer*> Consumers;
00058 typedef std::queue<Message::shared_ptr> Messages;
00059
00060 const string name;
00061 const uint32_t autodelete;
00062 MessageStore* const store;
00063 const ConnectionToken* const owner;
00064 Consumers consumers;
00065 Messages messages;
00066 bool queueing;
00067 bool dispatching;
00068 int next;
00069 mutable qpid::sys::Mutex lock;
00070 int64_t lastUsed;
00071 Consumer* exclusive;
00072 mutable uint64_t persistenceId;
00073 framing::FieldTable settings;
00074 std::auto_ptr<QueuePolicy> policy;
00075
00076 void pop();
00077 void push(Message::shared_ptr& msg);
00078 bool startDispatching();
00079 bool dispatch(Message::shared_ptr& msg);
00080 void setPolicy(std::auto_ptr<QueuePolicy> policy);
00081
00082 public:
00083
00084 typedef boost::shared_ptr<Queue> shared_ptr;
00085
00086 typedef std::vector<shared_ptr> vector;
00087
00088 Queue(const string& name, uint32_t autodelete = 0,
00089 MessageStore* const store = 0,
00090 const ConnectionToken* const owner = 0);
00091 ~Queue();
00092
00093 void create(const qpid::framing::FieldTable& settings);
00094 void configure(const qpid::framing::FieldTable& settings);
00095 void destroy();
00100 void deliver(Message::shared_ptr& msg);
00105 void process(Message::shared_ptr& msg);
00109 void recover(Message::shared_ptr& msg);
00116 void dispatch();
00117 void consume(Consumer* c, bool exclusive = false);
00118 void cancel(Consumer* c);
00119 uint32_t purge();
00120 uint32_t getMessageCount() const;
00121 uint32_t getConsumerCount() const;
00122 inline const string& getName() const { return name; }
00123 inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
00124 inline bool hasExclusiveConsumer() const { return exclusive; }
00125
00126 bool canAutoDelete() const;
00127
00128 void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
00132 void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
00136 Message::shared_ptr dequeue();
00137
00138 const QueuePolicy* const getPolicy();
00139
00140
00141 uint64_t getPersistenceId() const;
00142 void setPersistenceId(uint64_t persistenceId) const;
00143 void encode(framing::Buffer& buffer) const;
00144 uint32_t encodedSize() const;
00145
00146 static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
00147 };
00148 }
00149 }
00150
00151
00152 #endif