00001 #ifndef _tests_InProcessBroker_h
00002 #define _tests_InProcessBroker_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "qpid/framing/AMQP_HighestVersion.h"
00022 #include "qpid/framing/AMQFrame.h"
00023 #include "qpid/broker/Broker.h"
00024 #include "qpid/broker/Connection.h"
00025 #include "qpid/client/Connector.h"
00026 #include "qpid/client/Connection.h"
00027
00028 #include <vector>
00029 #include <iostream>
00030 #include <algorithm>
00031
00032
00033 namespace qpid {
00034 namespace broker {
00035
00045 class InProcessBroker : public client::Connector {
00046 public:
00047 enum Sender {CLIENT,BROKER};
00048
00050 struct TaggedFrame {
00051 TaggedFrame(Sender e, framing::AMQFrame* f) : frame(f), sender(e) {}
00052 bool fromBroker() const { return sender == BROKER; }
00053 bool fromClient() const { return sender == CLIENT; }
00054
00055 template <class MethodType>
00056 MethodType* asMethod() {
00057 return dynamic_cast<MethodType*>(frame->getBody().get());
00058 }
00059 shared_ptr<framing::AMQFrame> frame;
00060 Sender sender;
00061 };
00062
00063 typedef std::vector<TaggedFrame> Conversation;
00064
00065 InProcessBroker(framing::ProtocolVersion ver=
00066 framing::highestProtocolVersion
00067 ) :
00068 Connector(ver),
00069 protocolInit(ver),
00070 broker(broker::Broker::create()),
00071 brokerOut(BROKER, conversation),
00072 brokerConnection(&brokerOut, *broker),
00073 clientOut(CLIENT, conversation, &brokerConnection)
00074 {}
00075
00076 ~InProcessBroker() { broker->shutdown(); }
00077
00078 void connect(const std::string& , int ) {}
00079 void init() { brokerConnection.initiated(protocolInit); }
00080 void close() {}
00081
00083 void setInputHandler(framing::InputHandler* handler) {
00084 brokerOut.in = handler;
00085 }
00086
00088 void send(framing::AMQFrame* frame) {
00089 clientOut.send(frame);
00090 }
00091
00093 Conversation conversation;
00094
00095 private:
00097 struct OutputToInputHandler : public sys::ConnectionOutputHandler {
00098 OutputToInputHandler(
00099 Sender sender_, Conversation& conversation_,
00100 framing::InputHandler* ih=0
00101 ) : sender(sender_), conversation(conversation_), in(ih) {}
00102
00103 void send(framing::AMQFrame* frame) {
00104 conversation.push_back(TaggedFrame(sender, frame));
00105 in->received(frame);
00106 }
00107
00108 void close() {}
00109
00110 Sender sender;
00111 Conversation& conversation;
00112 framing::InputHandler* in;
00113 };
00114
00115 framing::ProtocolInitiation protocolInit;
00116 Broker::shared_ptr broker;
00117 OutputToInputHandler brokerOut;
00118 broker::Connection brokerConnection;
00119 OutputToInputHandler clientOut;
00120 };
00121
00122 std::ostream& operator<<(
00123 std::ostream& out, const InProcessBroker::TaggedFrame& tf)
00124 {
00125 return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << *tf.frame;
00126 }
00127
00128 std::ostream& operator<<(
00129 std::ostream& out, const InProcessBroker::Conversation& conv)
00130 {
00131 copy(conv.begin(), conv.end(),
00132 std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n"));
00133 return out;
00134 }
00135
00136 }
00137
00138
00139 namespace client {
00141 class InProcessBrokerClient : public client::Connection {
00142 public:
00143 broker::InProcessBroker broker;
00144 broker::InProcessBroker::Conversation& conversation;
00145
00147 InProcessBrokerClient(
00148 u_int32_t max_frame_size=65536,
00149 framing::ProtocolVersion version= framing::highestProtocolVersion
00150 ) : client::Connection(false, max_frame_size, version),
00151 broker(version),
00152 conversation(broker.conversation)
00153 {
00154 setConnector(broker);
00155 open("");
00156 }
00157 };
00158
00159
00160 }}
00161
00162
00163 #endif // _tests_InProcessBroker_h