17 #ifndef _CONNECTION_HH_ 18 #define _CONNECTION_HH_ 23 #include <google/protobuf/message.h> 25 #include <boost/asio.hpp> 26 #include <boost/bind.hpp> 27 #include <boost/function.hpp> 28 #include <boost/thread.hpp> 29 #include <boost/tuple/tuple.hpp> 43 #define HEADER_LENGTH 8 58 class GZ_TRANSPORT_VISIBLE ConnectionReadTask :
public tbb::task
64 public: ConnectionReadTask(
65 boost::function<
void (
const std::string &)> _func,
66 const std::string &_data) :
74 public: tbb::task *execute()
76 this->func(this->data);
81 private: boost::function<void (
const std::string &)> func;
84 private: std::string data;
103 public boost::enable_shared_from_this<Connection>
115 public:
bool Connect(
const std::string &_host,
unsigned int _port);
124 public:
void Listen(
unsigned int _port,
const AcceptCallback &_acceptCB);
135 public:
void StopRead();
138 public:
void Shutdown();
142 public:
bool IsOpen()
const;
145 private:
void Close();
148 public:
void Cancel();
153 public:
bool Read(std::string &_data);
162 public:
void EnqueueMsg(
const std::string &_buffer,
163 boost::function<
void(uint32_t)> _cb, uint32_t _id,
164 bool _force =
false);
170 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
174 public: std::string GetLocalURI()
const;
178 public: std::string GetRemoteURI()
const;
182 public: std::string GetLocalAddress()
const;
186 public:
unsigned int GetLocalPort()
const;
190 public: std::string GetRemoteAddress()
const;
194 public:
unsigned int GetRemotePort()
const;
198 public: std::string GetRemoteHostname()
const;
202 public:
static std::string GetLocalHostname();
206 public:
template<
typename Handler>
209 boost::mutex::scoped_lock lock(this->socketMutex);
212 gzerr <<
"AsyncRead on a closed socket\n";
216 void (
Connection::*f)(
const boost::system::error_code &,
217 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
220 boost::asio::async_read(*this->socket,
221 boost::asio::buffer(this->inboundHeader),
223 boost::asio::placeholders::error,
224 boost::make_tuple(_handler)));
234 private:
template<
typename Handler>
235 void OnReadHeader(
const boost::system::error_code &_e,
236 boost::tuple<Handler> _handler)
240 if (_e.value() == boost::asio::error::eof)
241 this->isOpen =
false;
245 std::size_t inboundData_size = 0;
246 std::string header(&this->inboundHeader[0],
247 this->inboundHeader.size());
248 this->inboundHeader.clear();
250 inboundData_size = this->ParseHeader(header);
252 if (inboundData_size > 0)
255 this->inboundData.resize(inboundData_size);
257 void (Connection::*f)(
const boost::system::error_code &e,
258 boost::tuple<Handler>) =
259 &Connection::OnReadData<Handler>;
261 boost::asio::async_read(*this->socket,
262 boost::asio::buffer(this->inboundData),
264 boost::asio::placeholders::error,
269 gzerr <<
"Header is empty\n";
270 boost::get<0>(_handler)(
"");
294 private:
template<
typename Handler>
295 void OnReadData(
const boost::system::error_code &_e,
296 boost::tuple<Handler> _handler)
300 if (_e.value() == boost::asio::error::eof)
301 this->isOpen =
false;
305 std::string data(&this->inboundData[0],
306 this->inboundData.size());
307 this->inboundData.clear();
310 gzerr <<
"OnReadData got empty data!!!\n";
314 ConnectionReadTask *task =
new(tbb::task::allocate_root())
315 ConnectionReadTask(boost::get<0>(_handler), data);
316 tbb::task::enqueue(*task);
328 {
return this->
shutdown.Connect(_subscriber); }
331 public:
void ProcessWriteQueue(
bool _blocking =
false);
335 public:
unsigned int GetId()
const;
340 public:
static bool ValidateIP(
const std::string &_ip);
345 public: std::string GetIPWhiteList()
const;
349 private:
void PostWrite();
354 private:
void OnWrite(
const boost::system::error_code &_e);
358 private:
void OnAccept(
const boost::system::error_code &_e);
362 private: std::size_t ParseHeader(
const std::string &_header);
365 private:
void ReadLoop(
const ReadCallback &_cb);
369 private:
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
373 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
377 private:
static std::string GetHostname(
378 boost::asio::ip::tcp::endpoint _ep);
383 private:
void OnConnect(
const boost::system::error_code &_error,
384 boost::asio::ip::tcp::resolver::iterator _endPointIter);
387 private: boost::asio::ip::tcp::socket *socket;
390 private: boost::asio::ip::tcp::acceptor *acceptor;
393 private: std::deque<std::string> writeQueue;
397 private: std::deque< std::vector<
398 std::pair<boost::function<void(uint32_t)>, uint32_t> > >
402 private: boost::mutex connectMutex;
405 private: boost::recursive_mutex writeMutex;
408 private: boost::recursive_mutex readMutex;
411 private:
mutable boost::mutex socketMutex;
414 private: boost::condition_variable connectCondition;
417 private: AcceptCallback acceptCB;
420 private: std::vector<char> inboundHeader;
423 private: std::vector<char> inboundData;
426 private:
bool readQuit;
429 private:
unsigned int id;
432 private:
static unsigned int idCounter;
444 private:
unsigned int writeCount;
447 private: std::string localURI;
450 private: std::string localAddress;
453 private: std::string remoteURI;
456 private: std::string remoteAddress;
459 private:
bool connectError;
462 private: std::string ipWhiteList;
465 private:
bool dropMsgLogged;
468 private:
bool isOpen;
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:118
Forward declarations for the common classes.
Definition: Animation.hh:26
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:127
#define HEADER_LENGTH
Definition: Connection.hh:43
#define gzerr
Output an error message.
Definition: Console.hh:50
Manages boost::asio IO.
Definition: IOManager.hh:33
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:326
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:52
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
#define NULL
Definition: CommonTypes.hh:31
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:207
A class for event processing.
Definition: Event.hh:97
bool is_stopped()
Is the transport system stopped?
Single TCP/IP connection manager.
Definition: Connection.hh:102