/home/aconway/svn/qpid/cpp/src/tests/InProcessBroker.h

00001 #ifndef _tests_InProcessBroker_h
00002 #define _tests_InProcessBroker_h
00003 
00004 /*
00005  *
00006  * Copyright (c) 2006 The Apache Software Foundation
00007  *
00008  * Licensed under the Apache License, Version 2.0 (the "License");
00009  * you may not use this file except in compliance with the License.
00010  * You may obtain a copy of the License at
00011  *
00012  *    http://www.apache.org/licenses/LICENSE-2.0
00013  *
00014  * Unless required by applicable law or agreed to in writing, software
00015  * distributed under the License is distributed on an "AS IS" BASIS,
00016  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00017  * See the License for the specific language governing permissions and
00018  * limitations under the License.
00019  *
00020  */
00021 #include "qpid/framing/AMQP_HighestVersion.h"
00022 #include "qpid/framing/AMQFrame.h"
00023 #include "qpid/broker/Broker.h"
00024 #include "qpid/broker/Connection.h"
00025 #include "qpid/client/Connector.h"
00026 #include "qpid/client/Connection.h"
00027 
00028 #include <vector>
00029 #include <iostream>
00030 #include <algorithm>
00031 
00032 
00033 namespace qpid {
00034 namespace broker {
00035 
00045 class InProcessBroker : public client::Connector {
00046   public:
00047     enum Sender {CLIENT,BROKER};
00048 
00050     struct TaggedFrame {
00051         TaggedFrame(Sender e, framing::AMQFrame* f) : frame(f), sender(e) {}
00052         bool fromBroker() const { return sender == BROKER; }
00053         bool fromClient() const { return sender == CLIENT; }
00054 
00055         template <class MethodType>
00056         MethodType* asMethod() {
00057             return dynamic_cast<MethodType*>(frame->getBody().get());
00058         }
00059         shared_ptr<framing::AMQFrame> frame;
00060         Sender sender;
00061     };
00062     
00063     typedef std::vector<TaggedFrame> Conversation;
00064 
00065     InProcessBroker(framing::ProtocolVersion ver=
00066                     framing::highestProtocolVersion
00067     ) :
00068         Connector(ver),
00069         protocolInit(ver),
00070         broker(broker::Broker::create()),
00071         brokerOut(BROKER, conversation),
00072         brokerConnection(&brokerOut, *broker),
00073         clientOut(CLIENT, conversation, &brokerConnection)
00074     {}
00075 
00076     ~InProcessBroker() { broker->shutdown(); }
00077 
00078     void connect(const std::string& /*host*/, int /*port*/) {}
00079     void init() { brokerConnection.initiated(protocolInit); }
00080     void close() {}
00081 
00083     void setInputHandler(framing::InputHandler* handler) {
00084         brokerOut.in = handler;
00085     }
00086 
00088     void send(framing::AMQFrame* frame) {
00089         clientOut.send(frame);
00090     }
00091 
00093     Conversation conversation;
00094 
00095   private:
00097     struct OutputToInputHandler : public sys::ConnectionOutputHandler {
00098         OutputToInputHandler(
00099             Sender sender_, Conversation& conversation_,
00100             framing::InputHandler* ih=0
00101         ) : sender(sender_), conversation(conversation_), in(ih) {}
00102 
00103         void send(framing::AMQFrame* frame) {
00104             conversation.push_back(TaggedFrame(sender, frame));
00105             in->received(frame);
00106         }
00107 
00108         void close() {}
00109         
00110         Sender sender;
00111         Conversation& conversation;
00112         framing::InputHandler* in;
00113     };
00114 
00115     framing::ProtocolInitiation protocolInit;
00116     Broker::shared_ptr  broker;
00117     OutputToInputHandler brokerOut;
00118     broker::Connection brokerConnection;
00119     OutputToInputHandler clientOut;
00120 };
00121 
00122 std::ostream& operator<<(
00123     std::ostream& out, const InProcessBroker::TaggedFrame& tf)
00124 {
00125     return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << *tf.frame;
00126 }
00127 
00128 std::ostream& operator<<(
00129     std::ostream& out, const InProcessBroker::Conversation& conv)
00130 {    
00131     copy(conv.begin(), conv.end(),
00132          std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n"));
00133     return out;
00134 }
00135 
00136 } // namespace broker
00137 
00138 
00139 namespace client {
00141 class InProcessBrokerClient : public client::Connection {
00142   public:
00143     broker::InProcessBroker broker;
00144     broker::InProcessBroker::Conversation& conversation;
00145     
00147     InProcessBrokerClient(
00148         u_int32_t max_frame_size=65536,
00149         framing::ProtocolVersion version= framing::highestProtocolVersion
00150     ) : client::Connection(false, max_frame_size, version),
00151         broker(version),
00152         conversation(broker.conversation)
00153     {
00154         setConnector(broker);
00155         open("");
00156     }
00157 };
00158 
00159 
00160 }} // namespace qpid::client
00161 
00162 
00163 #endif // _tests_InProcessBroker_h

Generated on Tue Apr 17 14:22:03 2007 for Qpid by  doxygen 1.4.7