00001 #ifndef _broker_BrokerChannel_h
00002 #define _broker_BrokerChannel_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <list>
00026
00027 #include <boost/scoped_ptr.hpp>
00028 #include <boost/shared_ptr.hpp>
00029 #include <boost/ptr_container/ptr_map.hpp>
00030
00031 #include "AccumulatedAck.h"
00032 #include "Consumer.h"
00033 #include "DeliveryRecord.h"
00034 #include "MessageBuilder.h"
00035 #include "NameGenerator.h"
00036 #include "Prefetch.h"
00037 #include "TxBuffer.h"
00038 #include "qpid/framing/ChannelAdapter.h"
00039 #include "qpid/framing/ChannelOpenBody.h"
00040 #include "CompletionHandler.h"
00041
00042 namespace qpid {
00043 namespace broker {
00044
00045 class ConnectionToken;
00046 class Connection;
00047 class Queue;
00048 class BrokerAdapter;
00049
00050 using framing::string;
00051
00056 class Channel : public framing::ChannelAdapter,
00057 public CompletionHandler
00058 {
00059 class ConsumerImpl : public Consumer
00060 {
00061 Channel* parent;
00062 const string tag;
00063 Queue::shared_ptr queue;
00064 ConnectionToken* const connection;
00065 const bool ackExpected;
00066 bool blocked;
00067
00068 public:
00069 ConsumerImpl(Channel* parent, const string& tag,
00070 Queue::shared_ptr queue,
00071 ConnectionToken* const connection, bool ack);
00072 ~ConsumerImpl();
00073 virtual bool deliver(Message::shared_ptr& msg);
00074 void cancel();
00075 void requestDispatch();
00076 };
00077
00078 typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
00079
00080 Connection& connection;
00081 uint64_t currentDeliveryTag;
00082 Queue::shared_ptr defaultQueue;
00083 bool transactional;
00084 ConsumerImplMap consumers;
00085 uint32_t prefetchSize;
00086 uint16_t prefetchCount;
00087 Prefetch outstanding;
00088 uint32_t framesize;
00089 NameGenerator tagGenerator;
00090 std::list<DeliveryRecord> unacked;
00091 sys::Mutex deliveryLock;
00092 TxBuffer txBuffer;
00093 AccumulatedAck accumulatedAck;
00094 MessageStore* const store;
00095 MessageBuilder messageBuilder;
00096 bool opened;
00097 boost::scoped_ptr<BrokerAdapter> adapter;
00098
00099
00100 void complete(Message::shared_ptr msg);
00101
00102 void deliver(Message::shared_ptr& msg, const string& tag,
00103 Queue::shared_ptr& queue, bool ackExpected);
00104 bool checkPrefetch(Message::shared_ptr& msg);
00105
00106 public:
00107 Channel(Connection& parent,
00108 framing::ChannelId id,
00109 uint32_t framesize,
00110 MessageStore* const _store = 0,
00111 uint64_t stagingThreshold = 0);
00112
00113 ~Channel();
00114
00115 bool isOpen() const { return opened; }
00116 BrokerAdapter& getAdatper() { return *adapter; }
00117
00118 void open() { opened = true; }
00119 void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
00120 Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
00121 uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
00122 uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
00123
00124 bool exists(const string& consumerTag);
00125
00129 void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
00130 bool exclusive, ConnectionToken* const connection = 0,
00131 const framing::FieldTable* = 0);
00132 void cancel(const string& tag);
00133 bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
00134 void begin();
00135 void close();
00136 void commit();
00137 void rollback();
00138 void ack();
00139 void ack(uint64_t deliveryTag, bool multiple);
00140 void ack(uint64_t deliveryTag, uint64_t endTag);
00141 void recover(bool requeue);
00142 void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);
00143 void handlePublish(Message* msg);
00144 void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
00145 void handleContent(boost::shared_ptr<framing::AMQContentBody>);
00146 void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
00147
00148 void handleInlineTransfer(Message::shared_ptr msg);
00149
00150
00151 void handleMethodInContext(
00152 boost::shared_ptr<framing::AMQMethodBody> method,
00153 const framing::MethodContext& context);
00154 };
00155
00156 }}
00157
00158
00159 #endif