00001 #ifndef _client_BasicMessageChannel_h
00002 #define _client_BasicMessageChannel_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "MessageChannel.h"
00023 #include "IncomingMessage.h"
00024 #include <boost/scoped_ptr.hpp>
00025
00026 namespace qpid {
00027 namespace client {
00032 class BasicMessageChannel : public MessageChannel
00033 {
00034 public:
00035 BasicMessageChannel(Channel& parent);
00036
00037 void consume(
00038 Queue& queue, std::string& tag, MessageListener* listener,
00039 AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
00040 const framing::FieldTable* fields = 0);
00041
00042 void cancel(const std::string& tag, bool synch = true);
00043
00044 bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
00045
00046 void publish(const Message& msg, const Exchange& exchange,
00047 const std::string& routingKey,
00048 bool mandatory = false, bool immediate = false);
00049
00050 void setReturnedMessageHandler(ReturnedMessageHandler* handler);
00051
00052 void run();
00053
00054 void handle(boost::shared_ptr<framing::AMQMethodBody>);
00055
00056 void handle(shared_ptr<framing::AMQHeaderBody>);
00057
00058 void handle(shared_ptr<framing::AMQContentBody>);
00059
00060 void setQos();
00061
00062 void close();
00063
00064 private:
00065
00066 struct Consumer{
00067 MessageListener* listener;
00068 AckMode ackMode;
00069 int count;
00070 u_int64_t lastDeliveryTag;
00071 };
00072 typedef std::map<std::string, Consumer> ConsumerMap;
00073
00074 void deliver(Consumer& consumer, Message& msg);
00075
00076 sys::Mutex lock;
00077 Channel& channel;
00078 IncomingMessage incoming;
00079 uint64_t incoming_size;
00080 ConsumerMap consumers ;
00081 ReturnedMessageHandler* returnsHandler;
00082 IncomingMessage::WaitableDestination destGet;
00083 IncomingMessage::WaitableDestination destDispatch;
00084 };
00085
00086 }}
00087
00088
00089
00090 #endif