Fawkes API  Fawkes Development Version
communicator.cpp
1 
2 /***************************************************************************
3  * communicator.cpp - protobuf network communication for CLIPS
4  *
5  * Created: Tue Apr 16 13:51:14 2013
6  * Copyright 2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include <core/threading/mutex_locker.h>
38 #include <google/protobuf/descriptor.h>
39 #include <logging/logger.h>
40 #include <protobuf_clips/communicator.h>
41 #include <protobuf_comm/client.h>
42 #include <protobuf_comm/peer.h>
43 #include <protobuf_comm/server.h>
44 
45 #include <boost/format.hpp>
46 
47 using namespace google::protobuf;
48 using namespace protobuf_comm;
49 
50 namespace protobuf_clips {
51 
52 /** @class ClipsProtobufCommunicator <protobuf_clips/communicator.h>
53  * CLIPS protobuf integration class.
54  * This class adds functionality related to protobuf to a given CLIPS
55  * environment. It supports the creation of communication channels
56  * through protobuf_comm. An instance maintains its own message register
57  * shared among server, peer, and clients.
58  * @author Tim Niemueller
59  */
60 
61 /** Constructor.
62  * @param env CLIPS environment to which to provide the protobuf functionality
63  * @param env_mutex mutex to lock when operating on the CLIPS environment.
64  * @param logger optional logger for informational output
65  */
66 ClipsProtobufCommunicator::ClipsProtobufCommunicator(CLIPS::Environment *env,
67  fawkes::Mutex & env_mutex,
68  fawkes::Logger * logger)
69 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
70 {
71  message_register_ = new MessageRegister();
72  setup_clips();
73 }
74 
75 /** Constructor.
76  * @param env CLIPS environment to which to provide the protobuf functionality
77  * @param env_mutex mutex to lock when operating on the CLIPS environment.
78  * @param proto_path proto path passed to a newly instantiated message register
79  * @param logger optional logger for informational output
80  */
82  fawkes::Mutex & env_mutex,
83  std::vector<std::string> &proto_path,
84  fawkes::Logger * logger)
85 : clips_(env), clips_mutex_(env_mutex), logger_(logger), server_(NULL), next_client_id_(0)
86 {
87  message_register_ = new MessageRegister(proto_path);
88  setup_clips();
89 }
90 
91 /** Destructor. */
93 {
94  {
95  fawkes::MutexLocker lock(&clips_mutex_);
96 
97  for (auto f : functions_) {
98  clips_->remove_function(f);
99  }
100  functions_.clear();
101  }
102 
103  for (auto c : clients_) {
104  delete c.second;
105  }
106  clients_.clear();
107 
108  delete message_register_;
109  delete server_;
110 }
111 
112 #define ADD_FUNCTION(n, s) \
113  clips_->add_function(n, s); \
114  functions_.push_back(n);
115 
116 /** Setup CLIPS environment. */
117 void
118 ClipsProtobufCommunicator::setup_clips()
119 {
120  fawkes::MutexLocker lock(&clips_mutex_);
121 
122  ADD_FUNCTION("pb-register-type",
123  (sigc::slot<CLIPS::Value, std::string>(
124  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_register_type))));
125  ADD_FUNCTION("pb-field-names",
126  (sigc::slot<CLIPS::Values, void *>(
127  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_names))));
128  ADD_FUNCTION("pb-field-type",
129  (sigc::slot<CLIPS::Value, void *, std::string>(
130  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_type))));
131  ADD_FUNCTION("pb-has-field",
132  (sigc::slot<CLIPS::Value, void *, std::string>(
133  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_has_field))));
134  ADD_FUNCTION("pb-field-label",
135  (sigc::slot<CLIPS::Value, void *, std::string>(
136  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_label))));
137  ADD_FUNCTION("pb-field-value",
138  (sigc::slot<CLIPS::Value, void *, std::string>(
139  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_value))));
140  ADD_FUNCTION("pb-field-list",
141  (sigc::slot<CLIPS::Values, void *, std::string>(
142  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_list))));
143  ADD_FUNCTION("pb-field-is-list",
144  (sigc::slot<CLIPS::Value, void *, std::string>(
145  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_field_is_list))));
146  ADD_FUNCTION("pb-create",
147  (sigc::slot<CLIPS::Value, std::string>(
148  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_create))));
149  ADD_FUNCTION("pb-destroy",
150  (sigc::slot<void, void *>(
151  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_destroy))));
152  ADD_FUNCTION("pb-ref",
153  (sigc::slot<CLIPS::Value, void *>(
154  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_ref))));
155  ADD_FUNCTION("pb-set-field",
156  (sigc::slot<void, void *, std::string, CLIPS::Value>(
157  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_set_field))));
158  ADD_FUNCTION("pb-add-list",
159  (sigc::slot<void, void *, std::string, CLIPS::Value>(
160  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_add_list))));
161  ADD_FUNCTION("pb-send",
162  (sigc::slot<void, long int, void *>(
163  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_send))));
164  ADD_FUNCTION("pb-tostring",
165  (sigc::slot<std::string, void *>(
166  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_tostring))));
167  ADD_FUNCTION("pb-server-enable",
168  (sigc::slot<void, int>(
169  sigc::mem_fun(*this, &ClipsProtobufCommunicator::enable_server))));
170  ADD_FUNCTION("pb-server-disable",
171  (sigc::slot<void>(
172  sigc::mem_fun(*this, &ClipsProtobufCommunicator::disable_server))));
173  ADD_FUNCTION("pb-peer-create",
174  (sigc::slot<long int, std::string, int>(
175  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create))));
176  ADD_FUNCTION("pb-peer-create-local",
177  (sigc::slot<long int, std::string, int, int>(
178  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_local))));
179  ADD_FUNCTION("pb-peer-create-crypto",
180  (sigc::slot<long int, std::string, int, std::string, std::string>(
181  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_create_crypto))));
182  ADD_FUNCTION("pb-peer-create-local-crypto",
183  (sigc::slot<long int, std::string, int, int, std::string, std::string>(sigc::mem_fun(
184  *this, &ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto))));
185  ADD_FUNCTION("pb-peer-destroy",
186  (sigc::slot<void, long int>(
187  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_destroy))));
188  ADD_FUNCTION("pb-peer-setup-crypto",
189  (sigc::slot<void, long int, std::string, std::string>(
190  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_peer_setup_crypto))));
191  ADD_FUNCTION("pb-broadcast",
192  (sigc::slot<void, long int, void *>(
193  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_broadcast))));
194  ADD_FUNCTION("pb-connect",
195  (sigc::slot<long int, std::string, int>(
196  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_client_connect))));
197  ADD_FUNCTION("pb-disconnect",
198  (sigc::slot<void, long int>(
199  sigc::mem_fun(*this, &ClipsProtobufCommunicator::clips_pb_disconnect))));
200 }
201 
202 /** Enable protobuf stream server.
203  * @param port TCP port to listen on for connections
204  */
205 void
207 {
208  if ((port > 0) && !server_) {
209  server_ = new protobuf_comm::ProtobufStreamServer(port, message_register_);
210 
211  server_->signal_connected().connect(
212  boost::bind(&ClipsProtobufCommunicator::handle_server_client_connected, this, _1, _2));
213  server_->signal_disconnected().connect(
214  boost::bind(&ClipsProtobufCommunicator::handle_server_client_disconnected, this, _1, _2));
215  server_->signal_received().connect(
216  boost::bind(&ClipsProtobufCommunicator::handle_server_client_msg, this, _1, _2, _3, _4));
217  server_->signal_receive_failed().connect(
218  boost::bind(&ClipsProtobufCommunicator::handle_server_client_fail, this, _1, _2, _3, _4));
219  }
220 }
221 
222 /** Disable protobu stream server. */
223 void
225 {
226  delete server_;
227  server_ = NULL;
228 }
229 
230 /** Enable protobuf peer.
231  * @param address IP address to send messages to
232  * @param send_port UDP port to send messages to
233  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
234  * @param crypto_key encryption key
235  * @param cipher cipher suite, see BufferEncryptor for supported types
236  * @return peer identifier
237  */
238 long int
239 ClipsProtobufCommunicator::clips_pb_peer_create_local_crypto(std::string address,
240  int send_port,
241  int recv_port,
242  std::string crypto_key,
243  std::string cipher)
244 {
245  if (recv_port <= 0)
246  recv_port = send_port;
247 
248  if (send_port > 0) {
250  address, send_port, recv_port, message_register_, crypto_key, cipher);
251 
252  long int peer_id;
253  {
254  fawkes::MutexLocker lock(&map_mutex_);
255  peer_id = ++next_client_id_;
256  peers_[peer_id] = peer;
257  }
258 
259  peer->signal_received().connect(
260  boost::bind(&ClipsProtobufCommunicator::handle_peer_msg, this, peer_id, _1, _2, _3, _4));
261  peer->signal_recv_error().connect(
262  boost::bind(&ClipsProtobufCommunicator::handle_peer_recv_error, this, peer_id, _1, _2));
263  peer->signal_send_error().connect(
264  boost::bind(&ClipsProtobufCommunicator::handle_peer_send_error, this, peer_id, _1));
265 
266  return peer_id;
267  } else {
268  return 0;
269  }
270 }
271 
272 /** Enable protobuf peer.
273  * @param address IP address to send messages to
274  * @param port UDP port to send and receive messages
275  * @param crypto_key encryption key
276  * @param cipher cipher suite, see BufferEncryptor for supported types
277  * @return peer identifier
278  */
279 long int
280 ClipsProtobufCommunicator::clips_pb_peer_create_crypto(std::string address,
281  int port,
282  std::string crypto_key,
283  std::string cipher)
284 {
285  return clips_pb_peer_create_local_crypto(address, port, port, crypto_key, cipher);
286 }
287 
288 /** Enable protobuf peer.
289  * @param address IP address to send messages to
290  * @param port UDP port to send and receive messages
291  * @return peer identifier
292  */
293 long int
294 ClipsProtobufCommunicator::clips_pb_peer_create(std::string address, int port)
295 {
296  return clips_pb_peer_create_local_crypto(address, port, port);
297 }
298 
299 /** Enable protobuf peer.
300  * @param address IP address to send messages to
301  * @param send_port UDP port to send messages to
302  * @param recv_port UDP port to receive messages on, 0 to use the same as the @p send_port
303  * @return peer identifier
304  */
305 long int
306 ClipsProtobufCommunicator::clips_pb_peer_create_local(std::string address,
307  int send_port,
308  int recv_port)
309 {
310  return clips_pb_peer_create_local_crypto(address, send_port, recv_port);
311 }
312 
313 /** Disable peer.
314  * @param peer_id ID of the peer to destroy
315  */
316 void
317 ClipsProtobufCommunicator::clips_pb_peer_destroy(long int peer_id)
318 {
319  if (peers_.find(peer_id) != peers_.end()) {
320  delete peers_[peer_id];
321  peers_.erase(peer_id);
322  }
323 }
324 
325 /** Setup crypto for peer.
326  * @param peer_id ID of the peer to destroy
327  * @param crypto_key encryption key
328  * @param cipher cipher suite, see BufferEncryptor for supported types
329  */
330 void
331 ClipsProtobufCommunicator::clips_pb_peer_setup_crypto(long int peer_id,
332  std::string crypto_key,
333  std::string cipher)
334 {
335  if (peers_.find(peer_id) != peers_.end()) {
336  peers_[peer_id]->setup_crypto(crypto_key, cipher);
337  }
338 }
339 
340 /** Register a new message type.
341  * @param full_name full name of type to register
342  * @return true if the type was successfully registered, false otherwise
343  */
344 CLIPS::Value
345 ClipsProtobufCommunicator::clips_pb_register_type(std::string full_name)
346 {
347  try {
348  message_register_->add_message_type(full_name);
349  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
350  } catch (std::runtime_error &e) {
351  if (logger_) {
352  logger_->log_error("CLIPS-Protobuf",
353  "Registering type %s failed: %s",
354  full_name.c_str(),
355  e.what());
356  }
357  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
358  }
359 }
360 
361 CLIPS::Value
362 ClipsProtobufCommunicator::clips_pb_create(std::string full_name)
363 {
364  try {
365  std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
366  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(m));
367  } catch (std::runtime_error &e) {
368  if (logger_) {
369  logger_->log_warn("CLIPS-Protobuf",
370  "Cannot create message of type %s: %s",
371  full_name.c_str(),
372  e.what());
373  }
374  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>());
375  }
376 }
377 
378 CLIPS::Value
379 ClipsProtobufCommunicator::clips_pb_ref(void *msgptr)
380 {
381  std::shared_ptr<google::protobuf::Message> *m =
382  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
383  if (!*m)
384  return new std::shared_ptr<google::protobuf::Message>();
385 
386  return CLIPS::Value(new std::shared_ptr<google::protobuf::Message>(*m));
387 }
388 
389 void
390 ClipsProtobufCommunicator::clips_pb_destroy(void *msgptr)
391 {
392  std::shared_ptr<google::protobuf::Message> *m =
393  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
394  if (!*m)
395  return;
396 
397  delete m;
398 }
399 
400 CLIPS::Values
401 ClipsProtobufCommunicator::clips_pb_field_names(void *msgptr)
402 {
403  std::shared_ptr<google::protobuf::Message> *m =
404  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
405  if (!*m)
406  return CLIPS::Values();
407 
408  const Descriptor *desc = (*m)->GetDescriptor();
409  const int field_count = desc->field_count();
410  CLIPS::Values field_names(field_count);
411  for (int i = 0; i < field_count; ++i) {
412  field_names[i].set(desc->field(i)->name(), true);
413  }
414  return field_names;
415 }
416 
417 CLIPS::Value
418 ClipsProtobufCommunicator::clips_pb_field_type(void *msgptr, std::string field_name)
419 {
420  std::shared_ptr<google::protobuf::Message> *m =
421  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
422  if (!*m)
423  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
424 
425  const Descriptor * desc = (*m)->GetDescriptor();
426  const FieldDescriptor *field = desc->FindFieldByName(field_name);
427  if (!field) {
428  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
429  }
430  switch (field->type()) {
431  case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value("DOUBLE", CLIPS::TYPE_SYMBOL);
432  case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value("FLOAT", CLIPS::TYPE_SYMBOL);
433  case FieldDescriptor::TYPE_INT64: return CLIPS::Value("INT64", CLIPS::TYPE_SYMBOL);
434  case FieldDescriptor::TYPE_UINT64: return CLIPS::Value("UINT64", CLIPS::TYPE_SYMBOL);
435  case FieldDescriptor::TYPE_INT32: return CLIPS::Value("INT32", CLIPS::TYPE_SYMBOL);
436  case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value("FIXED64", CLIPS::TYPE_SYMBOL);
437  case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value("FIXED32", CLIPS::TYPE_SYMBOL);
438  case FieldDescriptor::TYPE_BOOL: return CLIPS::Value("BOOL", CLIPS::TYPE_SYMBOL);
439  case FieldDescriptor::TYPE_STRING: return CLIPS::Value("STRING", CLIPS::TYPE_SYMBOL);
440  case FieldDescriptor::TYPE_MESSAGE: return CLIPS::Value("MESSAGE", CLIPS::TYPE_SYMBOL);
441  case FieldDescriptor::TYPE_BYTES: return CLIPS::Value("BYTES", CLIPS::TYPE_SYMBOL);
442  case FieldDescriptor::TYPE_UINT32: return CLIPS::Value("UINT32", CLIPS::TYPE_SYMBOL);
443  case FieldDescriptor::TYPE_ENUM: return CLIPS::Value("ENUM", CLIPS::TYPE_SYMBOL);
444  case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value("SFIXED32", CLIPS::TYPE_SYMBOL);
445  case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value("SFIXED64", CLIPS::TYPE_SYMBOL);
446  case FieldDescriptor::TYPE_SINT32: return CLIPS::Value("SINT32", CLIPS::TYPE_SYMBOL);
447  case FieldDescriptor::TYPE_SINT64: return CLIPS::Value("SINT64", CLIPS::TYPE_SYMBOL);
448  default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
449  }
450 }
451 
452 CLIPS::Value
453 ClipsProtobufCommunicator::clips_pb_has_field(void *msgptr, std::string field_name)
454 {
455  std::shared_ptr<google::protobuf::Message> *m =
456  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
457  if (!*m)
458  return false;
459 
460  const Descriptor * desc = (*m)->GetDescriptor();
461  const FieldDescriptor *field = desc->FindFieldByName(field_name);
462  if (!field)
463  return false;
464 
465  const Reflection *refl = (*m)->GetReflection();
466 
467  if (field->is_repeated()) {
468  return CLIPS::Value((refl->FieldSize(**m, field) > 0) ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
469  } else if (field->is_optional()) {
470  return CLIPS::Value(refl->HasField(**m, field) ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
471  } else {
472  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
473  }
474 }
475 
476 CLIPS::Value
477 ClipsProtobufCommunicator::clips_pb_field_label(void *msgptr, std::string field_name)
478 {
479  std::shared_ptr<google::protobuf::Message> *m =
480  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
481  if (!*m)
482  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
483 
484  const Descriptor * desc = (*m)->GetDescriptor();
485  const FieldDescriptor *field = desc->FindFieldByName(field_name);
486  if (!field) {
487  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
488  }
489  switch (field->label()) {
490  case FieldDescriptor::LABEL_OPTIONAL: return CLIPS::Value("OPTIONAL", CLIPS::TYPE_SYMBOL);
491  case FieldDescriptor::LABEL_REQUIRED: return CLIPS::Value("REQUIRED", CLIPS::TYPE_SYMBOL);
492  case FieldDescriptor::LABEL_REPEATED: return CLIPS::Value("REPEATED", CLIPS::TYPE_SYMBOL);
493  default: return CLIPS::Value("UNKNOWN", CLIPS::TYPE_SYMBOL);
494  }
495 }
496 
497 CLIPS::Value
498 ClipsProtobufCommunicator::clips_pb_field_value(void *msgptr, std::string field_name)
499 {
500  std::shared_ptr<google::protobuf::Message> *m =
501  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
502  if (!(m && *m)) {
503  if (logger_) {
504  logger_->log_warn("CLIPS-Protobuf", "Invalid message when setting %s", field_name.c_str());
505  }
506  return CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL);
507  }
508 
509  const Descriptor * desc = (*m)->GetDescriptor();
510  const FieldDescriptor *field = desc->FindFieldByName(field_name);
511  if (!field) {
512  if (logger_) {
513  logger_->log_warn("CLIPS-Protobuf",
514  "Field %s of %s does not exist",
515  field_name.c_str(),
516  (*m)->GetTypeName().c_str());
517  }
518  return CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL);
519  }
520  const Reflection *refl = (*m)->GetReflection();
521  if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
522  if (logger_) {
523  logger_->log_warn("CLIPS-Protobuf",
524  "Field %s of %s not set",
525  field_name.c_str(),
526  (*m)->GetTypeName().c_str());
527  }
528  return CLIPS::Value("NOT-SET", CLIPS::TYPE_SYMBOL);
529  }
530  switch (field->type()) {
531  case FieldDescriptor::TYPE_DOUBLE: return CLIPS::Value(refl->GetDouble(**m, field));
532  case FieldDescriptor::TYPE_FLOAT: return CLIPS::Value(refl->GetFloat(**m, field));
533  case FieldDescriptor::TYPE_INT64: return CLIPS::Value(refl->GetInt64(**m, field));
534  case FieldDescriptor::TYPE_UINT64: return CLIPS::Value((long int)refl->GetUInt64(**m, field));
535  case FieldDescriptor::TYPE_INT32: return CLIPS::Value(refl->GetInt32(**m, field));
536  case FieldDescriptor::TYPE_FIXED64: return CLIPS::Value((long int)refl->GetUInt64(**m, field));
537  case FieldDescriptor::TYPE_FIXED32: return CLIPS::Value(refl->GetUInt32(**m, field));
538  case FieldDescriptor::TYPE_BOOL:
539  //Booleans are represented as Symbols in CLIPS
540  if (refl->GetBool(**m, field)) {
541  return CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
542  } else {
543  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
544  }
545  case FieldDescriptor::TYPE_STRING: return CLIPS::Value(refl->GetString(**m, field));
546  case FieldDescriptor::TYPE_MESSAGE: {
547  const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
548  google::protobuf::Message * mcopy = mfield.New();
549  mcopy->CopyFrom(mfield);
550  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
551  return CLIPS::Value(ptr);
552  }
553  case FieldDescriptor::TYPE_BYTES: return CLIPS::Value((char *)"bytes");
554  case FieldDescriptor::TYPE_UINT32: return CLIPS::Value(refl->GetUInt32(**m, field));
555  case FieldDescriptor::TYPE_ENUM:
556  return CLIPS::Value(refl->GetEnum(**m, field)->name(), CLIPS::TYPE_SYMBOL);
557  case FieldDescriptor::TYPE_SFIXED32: return CLIPS::Value(refl->GetInt32(**m, field));
558  case FieldDescriptor::TYPE_SFIXED64: return CLIPS::Value(refl->GetInt64(**m, field));
559  case FieldDescriptor::TYPE_SINT32: return CLIPS::Value(refl->GetInt32(**m, field));
560  case FieldDescriptor::TYPE_SINT64: return CLIPS::Value(refl->GetInt64(**m, field));
561  default: throw std::logic_error("Unknown protobuf field type encountered");
562  }
563 }
564 
565 void
566 ClipsProtobufCommunicator::clips_pb_set_field(void * msgptr,
567  std::string field_name,
568  CLIPS::Value value)
569 {
570  std::shared_ptr<google::protobuf::Message> *m =
571  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
572  if (!(m && *m))
573  return;
574 
575  const Descriptor * desc = (*m)->GetDescriptor();
576  const FieldDescriptor *field = desc->FindFieldByName(field_name);
577  if (!field) {
578  if (logger_) {
579  logger_->log_warn("CLIPS-Protobuf", "Could not find field %s", field_name.c_str());
580  }
581  return;
582  }
583  const Reflection *refl = (*m)->GetReflection();
584 
585  try {
586  switch (field->type()) {
587  case FieldDescriptor::TYPE_DOUBLE: refl->SetDouble(m->get(), field, value.as_float()); break;
588  case FieldDescriptor::TYPE_FLOAT: refl->SetFloat(m->get(), field, value.as_float()); break;
589  case FieldDescriptor::TYPE_SFIXED64:
590  case FieldDescriptor::TYPE_SINT64:
591  case FieldDescriptor::TYPE_INT64: refl->SetInt64(m->get(), field, value.as_integer()); break;
592  case FieldDescriptor::TYPE_FIXED64:
593  case FieldDescriptor::TYPE_UINT64: refl->SetUInt64(m->get(), field, value.as_integer()); break;
594  case FieldDescriptor::TYPE_SFIXED32:
595  case FieldDescriptor::TYPE_SINT32:
596  case FieldDescriptor::TYPE_INT32: refl->SetInt32(m->get(), field, value.as_integer()); break;
597  case FieldDescriptor::TYPE_BOOL: refl->SetBool(m->get(), field, (value == "TRUE")); break;
598  case FieldDescriptor::TYPE_STRING: refl->SetString(m->get(), field, value.as_string()); break;
599  case FieldDescriptor::TYPE_MESSAGE: {
600  std::shared_ptr<google::protobuf::Message> *mfrom =
601  static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
602  Message *mut_msg = refl->MutableMessage(m->get(), field);
603  mut_msg->CopyFrom(**mfrom);
604  delete mfrom;
605  } break;
606  case FieldDescriptor::TYPE_BYTES: break;
607  case FieldDescriptor::TYPE_FIXED32:
608  case FieldDescriptor::TYPE_UINT32: refl->SetUInt32(m->get(), field, value.as_integer()); break;
609  case FieldDescriptor::TYPE_ENUM: {
610  const EnumDescriptor * enumdesc = field->enum_type();
611  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
612  if (enumval) {
613  refl->SetEnum(m->get(), field, enumval);
614  } else {
615  if (logger_) {
616  logger_->log_warn("CLIPS-Protobuf",
617  "%s: cannot set invalid "
618  "enum value '%s' on '%s'",
619  (*m)->GetTypeName().c_str(),
620  value.as_string().c_str(),
621  field_name.c_str());
622  }
623  }
624  } break;
625  default: throw std::logic_error("Unknown protobuf field type encountered");
626  }
627  } catch (std::logic_error &e) {
628  if (logger_) {
629  logger_->log_warn("CLIPS-Protobuf",
630  "Failed to set field %s of %s: %s "
631  "(type %d, as string %s)",
632  field_name.c_str(),
633  (*m)->GetTypeName().c_str(),
634  e.what(),
635  value.type(),
636  to_string(value).c_str());
637  }
638  }
639 }
640 
641 void
642 ClipsProtobufCommunicator::clips_pb_add_list(void * msgptr,
643  std::string field_name,
644  CLIPS::Value value)
645 {
646  std::shared_ptr<google::protobuf::Message> *m =
647  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
648  if (!(m && *m))
649  return;
650 
651  const Descriptor * desc = (*m)->GetDescriptor();
652  const FieldDescriptor *field = desc->FindFieldByName(field_name);
653  if (!field) {
654  if (logger_) {
655  logger_->log_warn("CLIPS-Protobuf", "Could not find field %s", field_name.c_str());
656  }
657  return;
658  }
659  const Reflection *refl = (*m)->GetReflection();
660 
661  try {
662  switch (field->type()) {
663  case FieldDescriptor::TYPE_DOUBLE: refl->AddDouble(m->get(), field, value); break;
664  case FieldDescriptor::TYPE_FLOAT: refl->AddFloat(m->get(), field, value); break;
665  case FieldDescriptor::TYPE_SFIXED64:
666  case FieldDescriptor::TYPE_SINT64:
667  case FieldDescriptor::TYPE_INT64: refl->AddInt64(m->get(), field, value); break;
668  case FieldDescriptor::TYPE_FIXED64:
669  case FieldDescriptor::TYPE_UINT64: refl->AddUInt64(m->get(), field, (long int)value); break;
670  case FieldDescriptor::TYPE_SFIXED32:
671  case FieldDescriptor::TYPE_SINT32:
672  case FieldDescriptor::TYPE_INT32: refl->AddInt32(m->get(), field, value); break;
673  case FieldDescriptor::TYPE_BOOL: refl->AddBool(m->get(), field, (value == "TRUE")); break;
674  case FieldDescriptor::TYPE_STRING: refl->AddString(m->get(), field, value); break;
675  case FieldDescriptor::TYPE_MESSAGE: {
676  std::shared_ptr<google::protobuf::Message> *mfrom =
677  static_cast<std::shared_ptr<google::protobuf::Message> *>(value.as_address());
678  Message *new_msg = refl->AddMessage(m->get(), field);
679  new_msg->CopyFrom(**mfrom);
680  delete mfrom;
681  } break;
682  case FieldDescriptor::TYPE_BYTES: break;
683  case FieldDescriptor::TYPE_FIXED32:
684  case FieldDescriptor::TYPE_UINT32: refl->AddUInt32(m->get(), field, value); break;
685  case FieldDescriptor::TYPE_ENUM: {
686  const EnumDescriptor * enumdesc = field->enum_type();
687  const EnumValueDescriptor *enumval = enumdesc->FindValueByName(value);
688  if (enumval)
689  refl->AddEnum(m->get(), field, enumval);
690  } break;
691  default: throw std::logic_error("Unknown protobuf field type encountered");
692  }
693  } catch (std::logic_error &e) {
694  if (logger_) {
695  logger_->log_warn("CLIPS-Protobuf",
696  "Failed to add field %s of %s: %s",
697  field_name.c_str(),
698  (*m)->GetTypeName().c_str(),
699  e.what());
700  }
701  }
702 }
703 
704 long int
705 ClipsProtobufCommunicator::clips_pb_client_connect(std::string host, int port)
706 {
707  if (port <= 0)
708  return false;
709 
710  ProtobufStreamClient *client = new ProtobufStreamClient(message_register_);
711 
712  long int client_id;
713  {
714  fawkes::MutexLocker lock(&map_mutex_);
715  client_id = ++next_client_id_;
716  clients_[client_id] = client;
717  }
718 
719  client->signal_connected().connect(
720  boost::bind(&ClipsProtobufCommunicator::handle_client_connected, this, client_id));
721  client->signal_disconnected().connect(
722  boost::bind(&ClipsProtobufCommunicator::handle_client_disconnected,
723  this,
724  client_id,
725  boost::asio::placeholders::error));
726  client->signal_received().connect(
727  boost::bind(&ClipsProtobufCommunicator::handle_client_msg, this, client_id, _1, _2, _3));
728  client->signal_receive_failed().connect(boost::bind(
729  &ClipsProtobufCommunicator::handle_client_receive_fail, this, client_id, _1, _2, _3));
730 
731  client->async_connect(host.c_str(), port);
732  return CLIPS::Value(client_id);
733 }
734 
735 void
736 ClipsProtobufCommunicator::clips_pb_send(long int client_id, void *msgptr)
737 {
738  std::shared_ptr<google::protobuf::Message> *m =
739  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
740  if (!(m && *m)) {
741  if (logger_) {
742  logger_->log_warn("CLIPS-Protobuf", "Cannot send to %li: invalid message", client_id);
743  }
744  return;
745  }
746 
747  try {
748  fawkes::MutexLocker lock(&map_mutex_);
749 
750  if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
751  //printf("***** SENDING via SERVER\n");
752  server_->send(server_clients_[client_id], *m);
753  sig_server_sent_(server_clients_[client_id], *m);
754  } else if (clients_.find(client_id) != clients_.end()) {
755  //printf("***** SENDING via CLIENT\n");
756  clients_[client_id]->send(*m);
757  std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
758  sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
759  } else if (peers_.find(client_id) != peers_.end()) {
760  //printf("***** SENDING via CLIENT\n");
761  peers_[client_id]->send(*m);
762  sig_peer_sent_(client_id, *m);
763  } else {
764  //printf("Client ID %li is unknown, cannot send message of type %s\n",
765  // client_id, (*m)->GetTypeName().c_str());
766  }
767  } catch (google::protobuf::FatalException &e) {
768  if (logger_) {
769  logger_->log_warn("CLIPS-Profobuf",
770  "Failed to send message of type %s: %s",
771  (*m)->GetTypeName().c_str(),
772  e.what());
773  }
774  } catch (fawkes::Exception &e) {
775  if (logger_) {
776  logger_->log_warn("CLIPS-Protobuf",
777  "Failed to send message of type %s: %s",
778  (*m)->GetTypeName().c_str(),
779  e.what_no_backtrace());
780  }
781  } catch (std::runtime_error &e) {
782  if (logger_) {
783  logger_->log_warn("CLIPS-Protobuf",
784  "Failed to send message of type %s: %s",
785  (*m)->GetTypeName().c_str(),
786  e.what());
787  }
788  }
789 }
790 
791 std::string
792 ClipsProtobufCommunicator::clips_pb_tostring(void *msgptr)
793 {
794  std::shared_ptr<google::protobuf::Message> *m =
795  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
796  if (!(m && *m)) {
797  if (logger_) {
798  logger_->log_warn("CLIPS-Protobuf", "Cannot convert message to string: invalid message");
799  }
800  return "";
801  }
802 
803  return (*m)->DebugString();
804 }
805 
806 void
807 ClipsProtobufCommunicator::clips_pb_broadcast(long int peer_id, void *msgptr)
808 {
809  std::shared_ptr<google::protobuf::Message> *m =
810  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
811  if (!(m && *m)) {
812  if (logger_) {
813  logger_->log_warn("CLIPS-Protobuf", "Cannot send broadcast: invalid message");
814  }
815  return;
816  }
817 
818  fawkes::MutexLocker lock(&map_mutex_);
819  if (peers_.find(peer_id) == peers_.end())
820  return;
821 
822  //logger_->log_info("CLIPS-Protobuf", "Broadcasting %s", (*m)->GetTypeName().c_str());
823  try {
824  peers_[peer_id]->send(*m);
825  } catch (google::protobuf::FatalException &e) {
826  if (logger_) {
827  logger_->log_warn("CLIPS-Protobuf",
828  "Failed to broadcast message of type %s: %s",
829  (*m)->GetTypeName().c_str(),
830  e.what());
831  }
832  } catch (fawkes::Exception &e) {
833  if (logger_) {
834  logger_->log_warn("CLIPS-Protobuf",
835  "Failed to broadcast message of type %s: %s",
836  (*m)->GetTypeName().c_str(),
837  e.what_no_backtrace());
838  }
839  } catch (std::runtime_error &e) {
840  if (logger_) {
841  logger_->log_warn("CLIPS-Protobuf",
842  "Failed to broadcast message of type %s: %s",
843  (*m)->GetTypeName().c_str(),
844  e.what());
845  }
846  }
847 
848  sig_peer_sent_(peer_id, *m);
849 }
850 
851 void
852 ClipsProtobufCommunicator::clips_pb_disconnect(long int client_id)
853 {
854  //logger_->log_info("CLIPS-Protobuf", "Disconnecting client %li", client_id);
855 
856  try {
857  fawkes::MutexLocker lock(&map_mutex_);
858 
859  if (server_clients_.find(client_id) != server_clients_.end()) {
860  protobuf_comm::ProtobufStreamServer::ClientID srv_client = server_clients_[client_id];
861  server_->disconnect(srv_client);
862  server_clients_.erase(client_id);
863  rev_server_clients_.erase(srv_client);
864  } else if (clients_.find(client_id) != clients_.end()) {
865  delete clients_[client_id];
866  clients_.erase(client_id);
867  }
868  } catch (std::runtime_error &e) {
869  if (logger_) {
870  logger_->log_warn("CLIPS-Protobuf",
871  "Failed to disconnect from client %li: %s",
872  client_id,
873  e.what());
874  }
875  }
876 }
877 
878 CLIPS::Values
879 ClipsProtobufCommunicator::clips_pb_field_list(void *msgptr, std::string field_name)
880 {
881  std::shared_ptr<google::protobuf::Message> *m =
882  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
883  if (!(m && *m))
884  return CLIPS::Values(1, CLIPS::Value("INVALID-MESSAGE", CLIPS::TYPE_SYMBOL));
885 
886  const Descriptor * desc = (*m)->GetDescriptor();
887  const FieldDescriptor *field = desc->FindFieldByName(field_name);
888  if (!field) {
889  return CLIPS::Values(1, CLIPS::Value("DOES-NOT-EXIST", CLIPS::TYPE_SYMBOL));
890  }
891  if (field->label() == FieldDescriptor::LABEL_REQUIRED
892  || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
893  CLIPS::Values rv(1, clips_pb_field_value(msgptr, field_name));
894  return rv;
895  }
896 
897  const Reflection *refl = (*m)->GetReflection();
898  int field_size = refl->FieldSize(**m, field);
899  CLIPS::Values rv(field_size);
900  for (int i = 0; i < field_size; ++i) {
901  switch (field->type()) {
902  case FieldDescriptor::TYPE_DOUBLE:
903  rv[i] = CLIPS::Value(refl->GetRepeatedDouble(**m, field, i));
904  break;
905  case FieldDescriptor::TYPE_FLOAT:
906  rv[i] = CLIPS::Value(refl->GetRepeatedFloat(**m, field, i));
907  break;
908  break;
909  case FieldDescriptor::TYPE_UINT64:
910  case FieldDescriptor::TYPE_FIXED64:
911  rv[i] = CLIPS::Value((long int)refl->GetRepeatedUInt64(**m, field, i));
912  break;
913  case FieldDescriptor::TYPE_UINT32:
914  case FieldDescriptor::TYPE_FIXED32:
915  rv[i] = CLIPS::Value(refl->GetRepeatedUInt32(**m, field, i));
916  break;
917  case FieldDescriptor::TYPE_BOOL:
918  //Booleans are represented as Symbols in CLIPS
919  if (refl->GetRepeatedBool(**m, field, i)) {
920  rv[i] = CLIPS::Value("TRUE", CLIPS::TYPE_SYMBOL);
921  } else {
922  rv[i] = CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
923  }
924  break;
925  case FieldDescriptor::TYPE_STRING:
926  rv[i] = CLIPS::Value(refl->GetRepeatedString(**m, field, i));
927  break;
928  case FieldDescriptor::TYPE_MESSAGE: {
929  const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
930  google::protobuf::Message * mcopy = msg.New();
931  mcopy->CopyFrom(msg);
932  void *ptr = new std::shared_ptr<google::protobuf::Message>(mcopy);
933  rv[i] = CLIPS::Value(ptr);
934  } break;
935  case FieldDescriptor::TYPE_BYTES:
936  rv[i] = CLIPS::Value((char *)"BYTES", CLIPS::TYPE_SYMBOL);
937  break;
938  case FieldDescriptor::TYPE_ENUM:
939  rv[i] = CLIPS::Value(refl->GetRepeatedEnum(**m, field, i)->name(), CLIPS::TYPE_SYMBOL);
940  break;
941  case FieldDescriptor::TYPE_SFIXED32:
942  case FieldDescriptor::TYPE_INT32:
943  case FieldDescriptor::TYPE_SINT32:
944  rv[i] = CLIPS::Value(refl->GetRepeatedInt32(**m, field, i));
945  break;
946  case FieldDescriptor::TYPE_SFIXED64:
947  case FieldDescriptor::TYPE_SINT64:
948  case FieldDescriptor::TYPE_INT64:
949  rv[i] = CLIPS::Value(refl->GetRepeatedInt64(**m, field, i));
950  break;
951  default: throw std::logic_error("Unknown protobuf field type encountered");
952  }
953  }
954 
955  return rv;
956 }
957 
958 CLIPS::Value
959 ClipsProtobufCommunicator::clips_pb_field_is_list(void *msgptr, std::string field_name)
960 {
961  std::shared_ptr<google::protobuf::Message> *m =
962  static_cast<std::shared_ptr<google::protobuf::Message> *>(msgptr);
963  if (!(m && *m))
964  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
965 
966  const Descriptor * desc = (*m)->GetDescriptor();
967  const FieldDescriptor *field = desc->FindFieldByName(field_name);
968  if (!field)
969  return CLIPS::Value("FALSE", CLIPS::TYPE_SYMBOL);
970  return CLIPS::Value(field->is_repeated() ? "TRUE" : "FALSE", CLIPS::TYPE_SYMBOL);
971 }
972 
973 void
974 ClipsProtobufCommunicator::clips_assert_message(std::pair<std::string, unsigned short> &endpoint,
975  uint16_t comp_id,
976  uint16_t msg_type,
977  std::shared_ptr<google::protobuf::Message> &msg,
978  ClipsProtobufCommunicator::ClientType ct,
979  unsigned int client_id)
980 {
981  CLIPS::Template::pointer temp = clips_->get_template("protobuf-msg");
982  if (temp) {
983  struct timeval tv;
984  gettimeofday(&tv, 0);
985  void * ptr = new std::shared_ptr<google::protobuf::Message>(msg);
986  CLIPS::Fact::pointer fact = CLIPS::Fact::create(*clips_, temp);
987  fact->set_slot("type", msg->GetTypeName());
988  fact->set_slot("comp-id", comp_id);
989  fact->set_slot("msg-type", msg_type);
990  fact->set_slot("rcvd-via",
991  CLIPS::Value((ct == CT_PEER) ? "BROADCAST" : "STREAM", CLIPS::TYPE_SYMBOL));
992  CLIPS::Values rcvd_at(2, CLIPS::Value(CLIPS::TYPE_INTEGER));
993  rcvd_at[0] = tv.tv_sec;
994  rcvd_at[1] = tv.tv_usec;
995  fact->set_slot("rcvd-at", rcvd_at);
996  CLIPS::Values host_port(2, CLIPS::Value(CLIPS::TYPE_STRING));
997  host_port[0] = endpoint.first;
998  host_port[1] = CLIPS::Value(endpoint.second);
999  fact->set_slot("rcvd-from", host_port);
1000  fact->set_slot("client-type",
1001  CLIPS::Value(ct == CT_CLIENT ? "CLIENT" : (ct == CT_SERVER ? "SERVER" : "PEER"),
1002  CLIPS::TYPE_SYMBOL));
1003  fact->set_slot("client-id", client_id);
1004  fact->set_slot("ptr", CLIPS::Value(ptr));
1005  CLIPS::Fact::pointer new_fact = clips_->assert_fact(fact);
1006 
1007  if (!new_fact) {
1008  if (logger_) {
1009  logger_->log_warn("CLIPS-Protobuf", "Asserting protobuf-msg fact failed");
1010  }
1011  delete static_cast<std::shared_ptr<google::protobuf::Message> *>(ptr);
1012  }
1013  } else {
1014  if (logger_) {
1015  logger_->log_warn("CLIPS-Protobuf", "Did not get template, did you load protobuf.clp?");
1016  }
1017  }
1018 }
1019 
1020 void
1021 ClipsProtobufCommunicator::handle_server_client_connected(ProtobufStreamServer::ClientID client,
1022  boost::asio::ip::tcp::endpoint &endpoint)
1023 {
1024  long int client_id = -1;
1025  {
1026  fawkes::MutexLocker lock(&map_mutex_);
1027  client_id = ++next_client_id_;
1028  client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1029  server_clients_[client_id] = client;
1030  rev_server_clients_[client] = client_id;
1031  }
1032 
1033  fawkes::MutexLocker lock(&clips_mutex_);
1034  clips_->assert_fact_f("(protobuf-server-client-connected %li %s %u)",
1035  client_id,
1036  endpoint.address().to_string().c_str(),
1037  endpoint.port());
1038 }
1039 
1040 void
1041 ClipsProtobufCommunicator::handle_server_client_disconnected(ProtobufStreamServer::ClientID client,
1042  const boost::system::error_code &error)
1043 {
1044  long int client_id = -1;
1045  {
1046  fawkes::MutexLocker lock(&map_mutex_);
1047  RevServerClientMap::iterator c;
1048  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1049  client_id = c->second;
1050  rev_server_clients_.erase(c);
1051  server_clients_.erase(client_id);
1052  }
1053  }
1054 
1055  if (client_id >= 0) {
1056  fawkes::MutexLocker lock(&clips_mutex_);
1057  clips_->assert_fact_f("(protobuf-server-client-disconnected %li)", client_id);
1058  }
1059 }
1060 
1061 /** Handle message that came from a client.
1062  * @param client client ID
1063  * @param component_id component the message was addressed to
1064  * @param msg_type type of the message
1065  * @param msg the message
1066  */
1067 void
1068 ClipsProtobufCommunicator::handle_server_client_msg(ProtobufStreamServer::ClientID client,
1069  uint16_t component_id,
1070  uint16_t msg_type,
1071  std::shared_ptr<google::protobuf::Message> msg)
1072 {
1073  fawkes::MutexLocker lock(&clips_mutex_);
1074  fawkes::MutexLocker lock2(&map_mutex_);
1075  RevServerClientMap::iterator c;
1076  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1077  clips_assert_message(
1078  client_endpoints_[c->second], component_id, msg_type, msg, CT_SERVER, c->second);
1079  }
1080 }
1081 
1082 /** Handle server reception failure
1083  * @param client client ID
1084  * @param component_id component the message was addressed to
1085  * @param msg_type type of the message
1086  * @param msg the message string
1087  */
1088 void
1089 ClipsProtobufCommunicator::handle_server_client_fail(ProtobufStreamServer::ClientID client,
1090  uint16_t component_id,
1091  uint16_t msg_type,
1092  std::string msg)
1093 {
1094  fawkes::MutexLocker lock(&map_mutex_);
1095  RevServerClientMap::iterator c;
1096  if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1097  fawkes::MutexLocker lock(&clips_mutex_);
1098  clips_->assert_fact_f("(protobuf-server-receive-failed (comp-id %u) (msg-type %u) "
1099  "(rcvd-via STREAM) (client-id %li) (message \"%s\") "
1100  "(rcvd-from (\"%s\" %u)))",
1101  component_id,
1102  msg_type,
1103  c->second,
1104  msg.c_str(),
1105  client_endpoints_[c->second].first.c_str(),
1106  client_endpoints_[c->second].second);
1107  }
1108 }
1109 
1110 /** Handle message that came from a peer/robot
1111  * @param endpoint the endpoint from which the message was received
1112  * @param component_id component the message was addressed to
1113  * @param msg_type type of the message
1114  * @param msg the message
1115  */
1116 void
1117 ClipsProtobufCommunicator::handle_peer_msg(long int peer_id,
1118  boost::asio::ip::udp::endpoint & endpoint,
1119  uint16_t component_id,
1120  uint16_t msg_type,
1121  std::shared_ptr<google::protobuf::Message> msg)
1122 {
1123  fawkes::MutexLocker lock(&clips_mutex_);
1124  std::pair<std::string, unsigned short> endpp =
1125  std::make_pair(endpoint.address().to_string(), endpoint.port());
1126  clips_assert_message(endpp, component_id, msg_type, msg, CT_PEER, peer_id);
1127 }
1128 
1129 /** Handle error during peer message processing.
1130  * @param endpoint endpoint of incoming message
1131  * @param msg error message
1132  */
1133 void
1134 ClipsProtobufCommunicator::handle_peer_recv_error(long int peer_id,
1135  boost::asio::ip::udp::endpoint &endpoint,
1136  std::string msg)
1137 {
1138  if (logger_) {
1139  logger_->log_warn("CLIPS-Protobuf",
1140  "Failed to receive peer message from %s:%u: %s",
1141  endpoint.address().to_string().c_str(),
1142  endpoint.port(),
1143  msg.c_str());
1144  }
1145 }
1146 
1147 /** Handle error during peer message processing.
1148  * @param msg error message
1149  */
1150 void
1151 ClipsProtobufCommunicator::handle_peer_send_error(long int peer_id, std::string msg)
1152 {
1153  if (logger_) {
1154  logger_->log_warn("CLIPS-Protobuf", "Failed to send peer message: %s", msg.c_str());
1155  }
1156 }
1157 
1158 void
1159 ClipsProtobufCommunicator::handle_client_connected(long int client_id)
1160 {
1161  fawkes::MutexLocker lock(&clips_mutex_);
1162  clips_->assert_fact_f("(protobuf-client-connected %li)", client_id);
1163 }
1164 
1165 void
1166 ClipsProtobufCommunicator::handle_client_disconnected(long int client_id,
1167  const boost::system::error_code &error)
1168 {
1169  fawkes::MutexLocker lock(&clips_mutex_);
1170  clips_->assert_fact_f("(protobuf-client-disconnected %li)", client_id);
1171 }
1172 
1173 void
1174 ClipsProtobufCommunicator::handle_client_msg(long int client_id,
1175  uint16_t comp_id,
1176  uint16_t msg_type,
1177  std::shared_ptr<google::protobuf::Message> msg)
1178 {
1179  fawkes::MutexLocker lock(&clips_mutex_);
1180  std::pair<std::string, unsigned short> endpp = std::make_pair(std::string(), 0);
1181  clips_assert_message(endpp, comp_id, msg_type, msg, CT_CLIENT, client_id);
1182 }
1183 
1184 void
1185 ClipsProtobufCommunicator::handle_client_receive_fail(long int client_id,
1186  uint16_t comp_id,
1187  uint16_t msg_type,
1188  std::string msg)
1189 {
1190  fawkes::MutexLocker lock(&clips_mutex_);
1191  clips_->assert_fact_f("(protobuf-receive-failed (client-id %li) (rcvd-via STREAM) "
1192  "(comp-id %u) (msg-type %u) (message \"%s\"))",
1193  client_id,
1194  comp_id,
1195  msg_type,
1196  msg.c_str());
1197 }
1198 
1199 std::string
1200 ClipsProtobufCommunicator::to_string(const CLIPS::Value &v)
1201 {
1202  switch (v.type()) {
1203  case CLIPS::TYPE_UNKNOWN: return "Unknown Type";
1204  case CLIPS::TYPE_FLOAT: return std::to_string(v.as_float());
1205  case CLIPS::TYPE_INTEGER: return std::to_string(v.as_integer());
1206  case CLIPS::TYPE_SYMBOL:
1207  case CLIPS::TYPE_INSTANCE_NAME:
1208  case CLIPS::TYPE_STRING: return v.as_string();
1209  case CLIPS::TYPE_INSTANCE_ADDRESS:
1210  case CLIPS::TYPE_EXTERNAL_ADDRESS: return boost::str(boost::format("%p") % v.as_address());
1211  }
1212  return "Implicit unknown type";
1213 }
1214 
1215 } // end namespace protobuf_clips
void enable_server(int port)
Enable protobuf stream server.
Mutex locking helper.
Definition: mutex_locker.h:33
void disable_server()
Disable protobu stream server.
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:65
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: client.h:93
Register to map msg type numbers to Protobuf messages.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
Definition: server.h:104
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: client.h:102
void disconnect(ClientID client)
Disconnect specific client.
Definition: server.cpp:447
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
Definition: server.h:131
Communicate by broadcasting protobuf messages.
Definition: peer.h:56
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
Definition: peer.h:173
Base class for exceptions in Fawkes.
Definition: exception.h:35
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
Definition: peer.h:144
std::shared_ptr< google::protobuf::Message > new_message_for(uint16_t component_id, uint16_t msg_type)
Create a new message instance.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
Definition: server.h:113
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
Definition: client.h:111
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
Definition: client.cpp:154
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.
Definition: server.cpp:319
Stream server for protobuf message transmission.
Definition: server.h:61
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
Definition: peer.h:164
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
Definition: server.h:122
Mutex mutual exclusion lock.
Definition: mutex.h:32
Stream client for protobuf message transmission.
Definition: client.h:55
void add_message_type(std::string msg_type)
Add a message type from generated pool.
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Definition: client.h:120
Interface for logging.
Definition: logger.h:41