/home/aconway/svn/qpid/cpp/src/qpid/broker/BrokerQueue.h

00001 #ifndef _broker_BrokerQueue_h
00002 #define _broker_BrokerQueue_h
00003 
00004 /*
00005  *
00006  * Licensed to the Apache Software Foundation (ASF) under one
00007  * or more contributor license agreements.  See the NOTICE file
00008  * distributed with this work for additional information
00009  * regarding copyright ownership.  The ASF licenses this file
00010  * to you under the Apache License, Version 2.0 (the
00011  * "License"); you may not use this file except in compliance
00012  * with the License.  You may obtain a copy of the License at
00013  * 
00014  *   http://www.apache.org/licenses/LICENSE-2.0
00015  * 
00016  * Unless required by applicable law or agreed to in writing,
00017  * software distributed under the License is distributed on an
00018  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00019  * KIND, either express or implied.  See the License for the
00020  * specific language governing permissions and limitations
00021  * under the License.
00022  *
00023  */
00024 #include <vector>
00025 #include <memory>
00026 #include <queue>
00027 #include <boost/shared_ptr.hpp>
00028 #include "qpid/framing/amqp_types.h"
00029 #include "ConnectionToken.h"
00030 #include "Consumer.h"
00031 #include "BrokerMessage.h"
00032 #include "qpid/framing/FieldTable.h"
00033 #include "qpid/sys/Monitor.h"
00034 #include "PersistableQueue.h"
00035 #include "QueuePolicy.h"
00036 
00037 // TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
00038 // enforce ownership of Consumers.
00039 
00040 namespace qpid {
00041     namespace broker {
00042         class MessageStore;
00043         class QueueRegistry;
00044 
00048         using std::string;
00049 
00056         class Queue : public PersistableQueue{
00057             typedef std::vector<Consumer*> Consumers;
00058             typedef std::queue<Message::shared_ptr> Messages;
00059             
00060             const string name;
00061             const uint32_t autodelete;
00062             MessageStore* const store;
00063             const ConnectionToken* const owner;
00064             Consumers consumers;
00065             Messages messages;
00066             bool queueing;
00067             bool dispatching;
00068             int next;
00069             mutable qpid::sys::Mutex lock;
00070             int64_t lastUsed;
00071             Consumer* exclusive;
00072             mutable uint64_t persistenceId;
00073             framing::FieldTable settings;
00074             std::auto_ptr<QueuePolicy> policy;            
00075 
00076             void pop();
00077             void push(Message::shared_ptr& msg);
00078             bool startDispatching();
00079             bool dispatch(Message::shared_ptr& msg);
00080             void setPolicy(std::auto_ptr<QueuePolicy> policy);
00081 
00082         public:
00083             
00084             typedef boost::shared_ptr<Queue> shared_ptr;
00085 
00086             typedef std::vector<shared_ptr> vector;
00087             
00088             Queue(const string& name, uint32_t autodelete = 0, 
00089                   MessageStore* const store = 0, 
00090                   const ConnectionToken* const owner = 0);
00091             ~Queue();
00092 
00093             void create(const qpid::framing::FieldTable& settings);
00094             void configure(const qpid::framing::FieldTable& settings);
00095             void destroy();
00100             void deliver(Message::shared_ptr& msg);
00105             void process(Message::shared_ptr& msg);
00109             void recover(Message::shared_ptr& msg);
00116             void dispatch();
00117             void consume(Consumer* c, bool exclusive = false);
00118             void cancel(Consumer* c);
00119             uint32_t purge();
00120             uint32_t getMessageCount() const;
00121             uint32_t getConsumerCount() const;
00122             inline const string& getName() const { return name; }
00123             inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
00124             inline bool hasExclusiveConsumer() const { return exclusive; }
00125 
00126             bool canAutoDelete() const;
00127 
00128             void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
00132             void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
00136             Message::shared_ptr dequeue();
00137 
00138             const QueuePolicy* const getPolicy();
00139 
00140             //PersistableQueue support:
00141             uint64_t getPersistenceId() const;
00142             void setPersistenceId(uint64_t persistenceId) const;
00143             void encode(framing::Buffer& buffer) const;
00144             uint32_t encodedSize() const;
00145 
00146             static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
00147         };
00148     }
00149 }
00150 
00151 
00152 #endif  

Generated on Tue Apr 17 14:22:03 2007 for Qpid by  doxygen 1.4.7