/home/aconway/svn/qpid/cpp/src/qpid/broker/BrokerChannel.h

00001 #ifndef _broker_BrokerChannel_h
00002 #define _broker_BrokerChannel_h
00003 
00004 /*
00005  *
00006  * Licensed to the Apache Software Foundation (ASF) under one
00007  * or more contributor license agreements.  See the NOTICE file
00008  * distributed with this work for additional information
00009  * regarding copyright ownership.  The ASF licenses this file
00010  * to you under the Apache License, Version 2.0 (the
00011  * "License"); you may not use this file except in compliance
00012  * with the License.  You may obtain a copy of the License at
00013  * 
00014  *   http://www.apache.org/licenses/LICENSE-2.0
00015  * 
00016  * Unless required by applicable law or agreed to in writing,
00017  * software distributed under the License is distributed on an
00018  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
00019  * KIND, either express or implied.  See the License for the
00020  * specific language governing permissions and limitations
00021  * under the License.
00022  *
00023  */
00024 
00025 #include <list>
00026 
00027 #include <boost/scoped_ptr.hpp>
00028 #include <boost/shared_ptr.hpp>
00029 #include <boost/ptr_container/ptr_map.hpp>
00030 
00031 #include "AccumulatedAck.h"
00032 #include "Consumer.h"
00033 #include "DeliveryRecord.h"
00034 #include "MessageBuilder.h"
00035 #include "NameGenerator.h"
00036 #include "Prefetch.h"
00037 #include "TxBuffer.h"
00038 #include "qpid/framing/ChannelAdapter.h"
00039 #include "qpid/framing/ChannelOpenBody.h"
00040 #include "CompletionHandler.h"
00041 
00042 namespace qpid {
00043 namespace broker {
00044 
00045 class ConnectionToken;
00046 class Connection;
00047 class Queue;
00048 class BrokerAdapter;
00049 
00050 using framing::string;
00051 
00056 class Channel : public framing::ChannelAdapter,
00057                 public CompletionHandler
00058 {
00059     class ConsumerImpl : public Consumer
00060     {
00061         Channel* parent;
00062         const string tag;
00063         Queue::shared_ptr queue;
00064         ConnectionToken* const connection;
00065         const bool ackExpected;
00066         bool blocked;
00067 
00068       public:
00069         ConsumerImpl(Channel* parent, const string& tag,
00070                      Queue::shared_ptr queue,
00071                      ConnectionToken* const connection, bool ack);
00072         ~ConsumerImpl();
00073         virtual bool deliver(Message::shared_ptr& msg);            
00074         void cancel();
00075         void requestDispatch();
00076     };
00077 
00078     typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
00079 
00080     Connection& connection;
00081     uint64_t currentDeliveryTag;
00082     Queue::shared_ptr defaultQueue;
00083     bool transactional;
00084     ConsumerImplMap consumers;
00085     uint32_t prefetchSize;    
00086     uint16_t prefetchCount;    
00087     Prefetch outstanding;
00088     uint32_t framesize;
00089     NameGenerator tagGenerator;
00090     std::list<DeliveryRecord> unacked;
00091     sys::Mutex deliveryLock;
00092     TxBuffer txBuffer;
00093     AccumulatedAck accumulatedAck;
00094     MessageStore* const store;
00095     MessageBuilder messageBuilder;//builder for in-progress message
00096     bool opened;
00097     boost::scoped_ptr<BrokerAdapter> adapter;
00098 
00099         // completion handler for MessageBuilder
00100     void complete(Message::shared_ptr msg);
00101     
00102     void deliver(Message::shared_ptr& msg, const string& tag,
00103                  Queue::shared_ptr& queue, bool ackExpected);            
00104     bool checkPrefetch(Message::shared_ptr& msg);
00105         
00106   public:
00107     Channel(Connection& parent,
00108             framing::ChannelId id,
00109             uint32_t framesize, 
00110             MessageStore* const _store = 0,
00111             uint64_t stagingThreshold = 0);
00112     
00113     ~Channel();
00114 
00115     bool isOpen() const { return opened; }
00116     BrokerAdapter& getAdatper() { return *adapter; }
00117     
00118     void open() { opened = true; }
00119     void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
00120     Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
00121     uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
00122     uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
00123 
00124     bool exists(const string& consumerTag);
00125 
00129     void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
00130                  bool exclusive, ConnectionToken* const connection = 0,
00131                  const framing::FieldTable* = 0);
00132     void cancel(const string& tag);
00133     bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
00134     void begin();
00135     void close();
00136     void commit();
00137     void rollback();
00138     void ack();
00139     void ack(uint64_t deliveryTag, bool multiple);
00140     void ack(uint64_t deliveryTag, uint64_t endTag);
00141     void recover(bool requeue);
00142     void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);            
00143     void handlePublish(Message* msg);
00144     void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
00145     void handleContent(boost::shared_ptr<framing::AMQContentBody>);
00146     void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
00147     
00148     void handleInlineTransfer(Message::shared_ptr msg);
00149     
00150     // For ChannelAdapter
00151     void handleMethodInContext(
00152         boost::shared_ptr<framing::AMQMethodBody> method,
00153         const framing::MethodContext& context);
00154 };
00155 
00156 }} // namespace broker
00157 
00158 
00159 #endif  

Generated on Tue Apr 17 14:22:03 2007 for Qpid by  doxygen 1.4.7