Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #ifndef Q_MOC_RUN
21 #include <tbb/task.h>
22 #endif
23 #include <google/protobuf/message.h>
24 
25 #include <boost/asio.hpp>
26 #include <boost/bind.hpp>
27 #include <boost/function.hpp>
28 #include <boost/thread.hpp>
29 #include <boost/tuple/tuple.hpp>
30 
31 #include <string>
32 #include <vector>
33 #include <iostream>
34 #include <iomanip>
35 #include <deque>
36 #include <utility>
37 
38 #include "gazebo/common/Event.hh"
39 #include "gazebo/common/Console.hh"
41 #include "gazebo/util/system.hh"
42 
43 #define HEADER_LENGTH 8
44 
45 namespace gazebo
46 {
47  namespace transport
48  {
49  extern GZ_TRANSPORT_VISIBLE bool is_stopped();
50 
51  class IOManager;
52  class Connection;
53  typedef boost::shared_ptr<Connection> ConnectionPtr;
54 
58  class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
59  {
64  public: ConnectionReadTask(
65  boost::function<void (const std::string &)> _func,
66  const std::string &_data) :
67  func(_func),
68  data(_data)
69  {
70  }
71 
74  public: tbb::task *execute()
75  {
76  this->func(this->data);
77  return NULL;
78  }
79 
81  private: boost::function<void (const std::string &)> func;
82 
84  private: std::string data;
85  };
87 
102  class GZ_TRANSPORT_VISIBLE Connection :
103  public boost::enable_shared_from_this<Connection>
104  {
106  public: Connection();
107 
109  public: virtual ~Connection();
110 
115  public: bool Connect(const std::string &_host, unsigned int _port);
116 
118  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
119 
124  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
125 
127  typedef boost::function<void(const std::string &_data)> ReadCallback;
128 
132  public: void StartRead(const ReadCallback &_cb);
133 
135  public: void StopRead();
136 
138  public: void Shutdown();
139 
142  public: bool IsOpen() const;
143 
145  private: void Close();
146 
148  public: void Cancel();
149 
153  public: bool Read(std::string &_data);
154 
162  public: void EnqueueMsg(const std::string &_buffer,
163  boost::function<void(uint32_t)> _cb, uint32_t _id,
164  bool _force = false);
165 
170  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
171 
174  public: std::string GetLocalURI() const;
175 
178  public: std::string GetRemoteURI() const;
179 
182  public: std::string GetLocalAddress() const;
183 
186  public: unsigned int GetLocalPort() const;
187 
190  public: std::string GetRemoteAddress() const;
191 
194  public: unsigned int GetRemotePort() const;
195 
198  public: std::string GetRemoteHostname() const;
199 
202  public: static std::string GetLocalHostname();
203 
206  public: template<typename Handler>
207  void AsyncRead(Handler _handler)
208  {
209  boost::mutex::scoped_lock lock(this->socketMutex);
210  if (!this->IsOpen())
211  {
212  gzerr << "AsyncRead on a closed socket\n";
213  return;
214  }
215 
216  void (Connection::*f)(const boost::system::error_code &,
217  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
218 
219  this->inboundHeader.resize(HEADER_LENGTH);
220  boost::asio::async_read(*this->socket,
221  boost::asio::buffer(this->inboundHeader),
222  boost::bind(f, this,
223  boost::asio::placeholders::error,
224  boost::make_tuple(_handler)));
225  }
226 
234  private: template<typename Handler>
235  void OnReadHeader(const boost::system::error_code &_e,
236  boost::tuple<Handler> _handler)
237  {
238  if (_e)
239  {
240  if (_e.value() == boost::asio::error::eof)
241  this->isOpen = false;
242  }
243  else
244  {
245  std::size_t inboundData_size = 0;
246  std::string header(&this->inboundHeader[0],
247  this->inboundHeader.size());
248  this->inboundHeader.clear();
249 
250  inboundData_size = this->ParseHeader(header);
251 
252  if (inboundData_size > 0)
253  {
254  // Start the asynchronous call to receive data
255  this->inboundData.resize(inboundData_size);
256 
257  void (Connection::*f)(const boost::system::error_code &e,
258  boost::tuple<Handler>) =
259  &Connection::OnReadData<Handler>;
260 
261  boost::asio::async_read(*this->socket,
262  boost::asio::buffer(this->inboundData),
263  boost::bind(f, this,
264  boost::asio::placeholders::error,
265  _handler));
266  }
267  else
268  {
269  gzerr << "Header is empty\n";
270  boost::get<0>(_handler)("");
271  // This code tries to read the header again. We should
272  // never get here.
273  // this->inboundHeader.resize(HEADER_LENGTH);
274 
275  // void (Connection::*f)(const boost::system::error_code &,
276  // boost::tuple<Handler>) =
277  // &Connection::OnReadHeader<Handler>;
278 
279  // boost::asio::async_read(*this->socket,
280  // boost::asio::buffer(this->inboundHeader),
281  // boost::bind(f, this,
282  // boost::asio::placeholders::error, _handler));
283  }
284  }
285  }
286 
294  private: template<typename Handler>
295  void OnReadData(const boost::system::error_code &_e,
296  boost::tuple<Handler> _handler)
297  {
298  if (_e)
299  {
300  if (_e.value() == boost::asio::error::eof)
301  this->isOpen = false;
302  }
303 
304  // Inform caller that data has been received
305  std::string data(&this->inboundData[0],
306  this->inboundData.size());
307  this->inboundData.clear();
308 
309  if (data.empty())
310  gzerr << "OnReadData got empty data!!!\n";
311 
312  if (!_e && !transport::is_stopped())
313  {
314  ConnectionReadTask *task = new(tbb::task::allocate_root())
315  ConnectionReadTask(boost::get<0>(_handler), data);
316  tbb::task::enqueue(*task);
317 
318  // Non-tbb version:
319  // boost::get<0>(_handler)(data);
320  }
321  }
322 
326  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
327  _subscriber)
328  { return this->shutdown.Connect(_subscriber); }
329 
331  public: void ProcessWriteQueue(bool _blocking = false);
332 
335  public: unsigned int GetId() const;
336 
340  public: static bool ValidateIP(const std::string &_ip);
341 
345  public: std::string GetIPWhiteList() const;
346 
349  private: void PostWrite();
350 
354  private: void OnWrite(const boost::system::error_code &_e);
355 
358  private: void OnAccept(const boost::system::error_code &_e);
359 
362  private: std::size_t ParseHeader(const std::string &_header);
363 
365  private: void ReadLoop(const ReadCallback &_cb);
366 
369  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
370 
373  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
374 
377  private: static std::string GetHostname(
378  boost::asio::ip::tcp::endpoint _ep);
379 
383  private: void OnConnect(const boost::system::error_code &_error,
384  boost::asio::ip::tcp::resolver::iterator _endPointIter);
385 
387  private: boost::asio::ip::tcp::socket *socket;
388 
390  private: boost::asio::ip::tcp::acceptor *acceptor;
391 
393  private: std::deque<std::string> writeQueue;
394 
397  private: std::deque< std::vector<
398  std::pair<boost::function<void(uint32_t)>, uint32_t> > >
399  callbacks;
400 
402  private: boost::mutex connectMutex;
403 
405  private: boost::recursive_mutex writeMutex;
406 
408  private: boost::recursive_mutex readMutex;
409 
411  private: mutable boost::mutex socketMutex;
412 
414  private: boost::condition_variable connectCondition;
415 
417  private: AcceptCallback acceptCB;
418 
420  private: std::vector<char> inboundHeader;
421 
423  private: std::vector<char> inboundData;
424 
426  private: bool readQuit;
427 
429  private: unsigned int id;
430 
432  private: static unsigned int idCounter;
433 
435  private: ConnectionPtr acceptConn;
436 
438  private: event::EventT<void()> shutdown;
439 
441  private: static IOManager *iomanager;
442 
444  private: unsigned int writeCount;
445 
447  private: std::string localURI;
448 
450  private: std::string localAddress;
451 
453  private: std::string remoteURI;
454 
456  private: std::string remoteAddress;
457 
459  private: bool connectError;
460 
462  private: std::string ipWhiteList;
463 
465  private: bool dropMsgLogged;
466 
468  private: bool isOpen;
469  };
471  }
472 }
473 #endif
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:118
Forward declarations for the common classes.
Definition: Animation.hh:26
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:127
#define HEADER_LENGTH
Definition: Connection.hh:43
#define gzerr
Output an error message.
Definition: Console.hh:50
Manages boost::asio IO.
Definition: IOManager.hh:33
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:326
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:52
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
#define NULL
Definition: CommonTypes.hh:31
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:207
A class for event processing.
Definition: Event.hh:97
bool is_stopped()
Is the transport system stopped?
Single TCP/IP connection manager.
Definition: Connection.hh:102