Fawkes API  Fawkes Development Version
client.cpp
1 
2 /***************************************************************************
3  * client.cpp - Protobuf stream protocol - client
4  *
5  * Created: Thu Jan 31 17:38:04 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <protobuf_comm/client.h>
38 
39 #include <boost/lexical_cast.hpp>
40 
41 using namespace boost::asio;
42 using namespace boost::system;
43 
44 namespace protobuf_comm {
45 
46 /** @class ProtobufStreamClient <protobuf_comm/client.h>
47  * Stream client for protobuf message transmission.
48  * The client opens a TCP connection (IPv4) to a specified server and
49  * send and receives messages to the remote.
50  * @author Tim Niemueller
51  */
52 
53 /** Constructor. */
54 ProtobufStreamClient::ProtobufStreamClient()
55 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
56 {
57  message_register_ = new MessageRegister();
58  own_message_register_ = true;
59  connected_ = false;
60  outbound_active_ = false;
61  in_data_size_ = 1024;
62  frame_header_version_ = PB_FRAME_V2;
63  in_frame_header_size_ = sizeof(frame_header_t);
64  in_frame_header_ = malloc(in_frame_header_size_);
65  in_data_ = malloc(in_data_size_);
66  run_asio();
67 }
68 
69 /** Constructor.
70  * @param proto_path file paths to search for proto files. All message types
71  * within these files will automatically be registered and available for dynamic
72  * message creation.
73  */
74 ProtobufStreamClient::ProtobufStreamClient(std::vector<std::string> &proto_path)
75 : resolver_(io_service_), socket_(io_service_), io_service_work_(io_service_)
76 {
77  message_register_ = new MessageRegister(proto_path);
78  own_message_register_ = true;
79  connected_ = false;
80  outbound_active_ = false;
81  in_data_size_ = 1024;
82  in_data_ = malloc(in_data_size_);
83  frame_header_version_ = PB_FRAME_V2;
84  in_frame_header_size_ = sizeof(frame_header_t);
85  in_frame_header_ = malloc(in_frame_header_size_);
86  run_asio();
87 }
88 
89 /** Constructor.
90  * @param mr message register to use to (de)serialize messages
91  * @param header_version protobuf protocol frame header version to use,
92  */
94  frame_header_version_t header_version)
95 : resolver_(io_service_),
96  socket_(io_service_),
97  io_service_work_(io_service_),
98  message_register_(mr),
99  own_message_register_(false),
100  frame_header_version_(header_version)
101 {
102  connected_ = false;
103  outbound_active_ = false;
104  in_data_size_ = 1024;
105  in_data_ = malloc(in_data_size_);
106  if (frame_header_version_ == PB_FRAME_V1) {
107  in_frame_header_size_ = sizeof(frame_header_v1_t);
108  } else {
109  in_frame_header_size_ = sizeof(frame_header_t);
110  }
111  in_frame_header_ = malloc(in_frame_header_size_);
112  run_asio();
113 }
114 
115 /** Destructor. */
117 {
118  disconnect_nosig();
119  io_service_.stop();
120  asio_thread_.join();
121  free(in_data_);
122  free(in_frame_header_);
123  if (own_message_register_) {
124  delete message_register_;
125  }
126 }
127 
128 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6))
129 static void
130 run_asio_thread(boost::asio::io_service &io_service)
131 {
132  io_service.run();
133 }
134 #endif
135 
136 void
137 ProtobufStreamClient::run_asio()
138 {
139 #if defined(__GNUC__) && (__GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 6))
140  asio_thread_ = std::thread(run_asio_thread, std::ref(io_service_));
141 #else
142  asio_thread_ = std::thread([this]() { this->io_service_.run(); });
143 #endif
144 }
145 
146 /** Asynchronous connect.
147  * This triggers connection establishment. The method does not block,
148  * i.e. it returns immediately and does not wait for the connection to
149  * be established.
150  * @param host host to connect to
151  * @param port TCP port to connect to
152  */
153 void
154 ProtobufStreamClient::async_connect(const char *host, unsigned short port)
155 {
156  ip::tcp::resolver::query query(host, boost::lexical_cast<std::string>(port));
157  resolver_.async_resolve(query,
158  boost::bind(&ProtobufStreamClient::handle_resolve,
159  this,
160  boost::asio::placeholders::error,
161  boost::asio::placeholders::iterator));
162 }
163 
164 void
165 ProtobufStreamClient::handle_resolve(const boost::system::error_code &err,
166  ip::tcp::resolver::iterator endpoint_iterator)
167 {
168  if (!err) {
169  // Attempt a connection to each endpoint in the list until we
170  // successfully establish a connection.
171 #if BOOST_ASIO_VERSION > 100409
172  boost::asio::async_connect(socket_,
173  endpoint_iterator,
174 #else
175  socket_.async_connect(*endpoint_iterator,
176 #endif
177  boost::bind(&ProtobufStreamClient::handle_connect,
178  this,
179  boost::asio::placeholders::error));
180  } else {
181  disconnect_nosig();
182  sig_disconnected_(err);
183  }
184 }
185 
186 void
187 ProtobufStreamClient::handle_connect(const boost::system::error_code &err)
188 {
189  if (!err) {
190 #if BOOST_VERSION >= 105400 && BOOST_VERSION < 105500
191  // Boost 1.54 has a bug that causes async_connect to report success
192  // if it cannot connect at all to the other side, cf.
193  // https://svn.boost.org/trac/boost/ticket/8795
194  // Work around by explicitly checking for connected status
195  boost::system::error_code ec;
196  socket_.remote_endpoint(ec);
197  if (ec == boost::system::errc::not_connected) {
198  disconnect_nosig();
199  sig_disconnected_(ec);
200  return;
201  }
202 #endif
203  connected_ = true;
204  start_recv();
205  sig_connected_();
206  } else {
207  disconnect_nosig();
208  sig_disconnected_(err);
209  }
210 }
211 
212 void
213 ProtobufStreamClient::disconnect_nosig()
214 {
215  boost::system::error_code err;
216  if (socket_.is_open()) {
217  socket_.shutdown(ip::tcp::socket::shutdown_both, err);
218  socket_.close();
219  }
220  connected_ = false;
221 }
222 
223 /** Disconnect from remote host. */
224 void
226 {
227  disconnect_nosig();
228  sig_disconnected_(boost::system::error_code());
229 }
230 
231 void
232 ProtobufStreamClient::start_recv()
233 {
234  boost::asio::async_read(socket_,
235  boost::asio::buffer(in_frame_header_, in_frame_header_size_),
236  boost::bind(&ProtobufStreamClient::handle_read_header,
237  this,
238  boost::asio::placeholders::error));
239 }
240 
241 void
242 ProtobufStreamClient::handle_read_header(const boost::system::error_code &error)
243 {
244  if (!error) {
245  size_t to_read;
246  if (frame_header_version_ == PB_FRAME_V1) {
247  frame_header_v1_t *frame_header = (frame_header_v1_t *)in_frame_header_;
248  to_read = ntohl(frame_header->payload_size);
249  } else {
250  frame_header_t *frame_header = (frame_header_t *)in_frame_header_;
251  to_read = ntohl(frame_header->payload_size);
252  }
253  if (to_read > in_data_size_) {
254  void *new_data = realloc(in_data_, to_read);
255  if (new_data) {
256  in_data_size_ = to_read;
257  in_data_ = new_data;
258  } else {
259  disconnect_nosig();
260  sig_disconnected_(errc::make_error_code(errc::not_enough_memory));
261  }
262  }
263  // setup new read
264  boost::asio::async_read(socket_,
265  boost::asio::buffer(in_data_, to_read),
266  boost::bind(&ProtobufStreamClient::handle_read_message,
267  this,
268  boost::asio::placeholders::error));
269  } else {
270  disconnect_nosig();
271  sig_disconnected_(error);
272  }
273 }
274 
275 void
276 ProtobufStreamClient::handle_read_message(const boost::system::error_code &error)
277 {
278  if (!error) {
279  frame_header_t frame_header;
280  message_header_t message_header;
281  void * data;
282 
283  if (frame_header_version_ == PB_FRAME_V1) {
284  frame_header_v1_t *frame_header_v1 = (frame_header_v1_t *)in_frame_header_;
285  frame_header.header_version = PB_FRAME_V1;
286  frame_header.cipher = PB_ENCRYPTION_NONE;
287  frame_header.payload_size =
288  htonl(ntohl(frame_header_v1->payload_size) + sizeof(message_header_t));
289  message_header.component_id = frame_header_v1->component_id;
290  message_header.msg_type = frame_header_v1->msg_type;
291  data = in_data_;
292  } else {
293  memcpy(&frame_header, in_frame_header_, sizeof(frame_header_t));
294 
295  message_header_t *msg_header = static_cast<message_header_t *>(in_data_);
296  message_header.component_id = msg_header->component_id;
297  message_header.msg_type = msg_header->msg_type;
298 
299  data = (char *)in_data_ + sizeof(message_header);
300  }
301 
302  uint16_t comp_id = ntohs(message_header.component_id);
303  uint16_t msg_type = ntohs(message_header.msg_type);
304  try {
305  std::shared_ptr<google::protobuf::Message> m =
306  message_register_->deserialize(frame_header, message_header, data);
307 
308  sig_rcvd_(comp_id, msg_type, m);
309  } catch (std::runtime_error &e) {
310  sig_recv_failed_(comp_id, msg_type, e.what());
311  }
312 
313  start_recv();
314  } else {
315  disconnect_nosig();
316  sig_disconnected_(error);
317  }
318 }
319 
320 void
321 ProtobufStreamClient::handle_write(const boost::system::error_code &error,
322  size_t /*bytes_transferred*/,
323  QueueEntry *entry)
324 {
325  delete entry;
326 
327  if (!error) {
328  std::lock_guard<std::mutex> lock(outbound_mutex_);
329  if (!outbound_queue_.empty()) {
330  QueueEntry *front_entry = outbound_queue_.front();
331  outbound_queue_.pop();
332  boost::asio::async_write(socket_,
333  front_entry->buffers,
334  boost::bind(&ProtobufStreamClient::handle_write,
335  this,
336  boost::asio::placeholders::error,
337  boost::asio::placeholders::bytes_transferred,
338  front_entry));
339  } else {
340  outbound_active_ = false;
341  }
342  } else {
343  disconnect_nosig();
344  sig_disconnected_(error);
345  }
346 }
347 
348 /** Send a message to the server.
349  * @param component_id ID of the component to address
350  * @param msg_type numeric message type
351  * @param m message to send
352  */
353 void
354 ProtobufStreamClient::send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
355 {
356  if (!connected_) {
357  throw std::runtime_error("Cannot send while not connected");
358  }
359 
360  QueueEntry *entry = new QueueEntry();
361  message_register_->serialize(component_id,
362  msg_type,
363  m,
364  entry->frame_header,
365  entry->message_header,
366  entry->serialized_message);
367 
368  if (frame_header_version_ == PB_FRAME_V1) {
372  htonl(ntohl(entry->frame_header.payload_size) - sizeof(message_header_t));
373 
374  entry->buffers[0] = boost::asio::buffer(&entry->frame_header_v1, sizeof(frame_header_v1_t));
375  entry->buffers[1] = boost::asio::const_buffer();
376  } else {
377  entry->buffers[0] = boost::asio::buffer(&entry->frame_header, sizeof(frame_header_t));
378  entry->buffers[1] = boost::asio::buffer(&entry->message_header, sizeof(message_header_t));
379  }
380  entry->buffers[2] = boost::asio::buffer(entry->serialized_message);
381 
382  std::lock_guard<std::mutex> lock(outbound_mutex_);
383  if (outbound_active_) {
384  outbound_queue_.push(entry);
385  } else {
386  outbound_active_ = true;
387  boost::asio::async_write(socket_,
388  entry->buffers,
389  boost::bind(&ProtobufStreamClient::handle_write,
390  this,
391  boost::asio::placeholders::error,
392  boost::asio::placeholders::bytes_transferred,
393  entry));
394  }
395 }
396 
397 /** Send a message to the server.
398  * @param m message to send, the message must be of a type with a suitable CompType
399  * enum indicating component ID and message type.
400  */
401 void
402 ProtobufStreamClient::send(google::protobuf::Message &m)
403 {
404  const google::protobuf::Descriptor * desc = m.GetDescriptor();
405  const google::protobuf::EnumDescriptor *enumdesc = desc->FindEnumTypeByName("CompType");
406  if (!enumdesc) {
407  throw std::logic_error("Message does not have CompType enum");
408  }
409  const google::protobuf::EnumValueDescriptor *compdesc = enumdesc->FindValueByName("COMP_ID");
410  const google::protobuf::EnumValueDescriptor *msgtdesc = enumdesc->FindValueByName("MSG_TYPE");
411  if (!compdesc || !msgtdesc) {
412  throw std::logic_error("Message CompType enum hs no COMP_ID or MSG_TYPE value");
413  }
414  int comp_id = compdesc->number();
415  int msg_type = msgtdesc->number();
416  if (comp_id < 0 || comp_id > std::numeric_limits<uint16_t>::max()) {
417  throw std::logic_error("Message has invalid COMP_ID");
418  }
419  if (msg_type < 0 || msg_type > std::numeric_limits<uint16_t>::max()) {
420  throw std::logic_error("Message has invalid MSG_TYPE");
421  }
422 
423  send(comp_id, msg_type, m);
424 }
425 
426 /** Send a message to the server.
427  * @param m message to send, the message must be of a type with a suitable CompType
428  * enum indicating component ID and message type.
429  */
430 void
431 ProtobufStreamClient::send(std::shared_ptr<google::protobuf::Message> m)
432 {
433  send(*m);
434 }
435 
436 } // end namespace protobuf_comm
uint16_t msg_type
message type
Definition: frame_header.h:102
Old network message framing header.
Definition: frame_header.h:115
Outgoing queue entry.
Definition: queue_entry.h:46
uint32_t payload_size
payload size in bytes
Definition: frame_header.h:122
Register to map msg type numbers to Protobuf messages.
std::shared_ptr< google::protobuf::Message > deserialize(frame_header_t &frame_header, message_header_t &message_header, void *data)
Deserialize message.
Network framing header.
Definition: frame_header.h:71
std::array< boost::asio::const_buffer, 3 > buffers
outgoing buffers
Definition: queue_entry.h:59
void disconnect()
Disconnect from remote host.
Definition: client.cpp:225
void serialize(uint16_t component_id, uint16_t msg_type, const google::protobuf::Message &msg, frame_header_t &frame_header, message_header_t &message_header, std::string &data)
Serialize a message.
uint32_t payload_size
payload size in bytes includes message and header, not IV
Definition: frame_header.h:84
uint16_t component_id
component id
Definition: frame_header.h:118
frame_header_t frame_header
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:56
void send(uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the server.
Definition: client.cpp:354
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
Definition: client.cpp:154
uint16_t msg_type
message type
Definition: frame_header.h:120
Network message header.
Definition: frame_header.h:97
uint16_t component_id
component id
Definition: frame_header.h:100
frame_header_v1_t frame_header_v1
Frame header (network byte order), never encrypted.
Definition: queue_entry.h:57
message_header_t message_header
Frame header (network byte order)
Definition: queue_entry.h:58
std::string serialized_message
serialized protobuf message
Definition: queue_entry.h:54