Ignition Transport

API Reference

4.0.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 
21 #ifdef _WIN32
22  // For socket(), connect(), send(), and recv().
23  #include <Winsock2.h>
24  #include <Ws2def.h>
25  #include <Ws2ipdef.h>
26  #include <Ws2tcpip.h>
27  // Type used for raw data on this platform.
28  using raw_type = char;
29 #else
30  // For data types
31  #include <sys/types.h>
32  // For socket(), connect(), send(), and recv()
33  #include <sys/socket.h>
34  // For gethostbyname()
35  #include <netdb.h>
36  // For inet_addr()
37  #include <arpa/inet.h>
38  // For close()
39  #include <unistd.h>
40  // For sockaddr_in
41  #include <netinet/in.h>
42  // Type used for raw data on this platform
43  using raw_type = void;
44 #endif
45 
46 #ifdef _WIN32
47  #pragma warning(push, 0)
48 #endif
49 #ifdef _WIN32
50  #pragma warning(pop)
51  // Suppress "decorated name length exceed" warning in STL.
52  #pragma warning(disable: 4503)
53  // Suppress "depreted API warnings" in WINSOCK.
54  #pragma warning(disable: 4996)
55 #endif
56 
57 #include <algorithm>
58 #include <condition_variable>
59 #include <map>
60 #include <memory>
61 #include <mutex>
62 #include <string>
63 #include <thread>
64 #include <vector>
65 
66 #include "ignition/transport/Export.hh"
73 
74 namespace ignition
75 {
76  namespace transport
77  {
83  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
84  const std::vector<int> &_sockets,
85  const int _timeout);
86 
95  template<typename Pub>
96  class Discovery
97  {
103  public: Discovery(const std::string &_pUuid,
104  const int _port,
105  const bool _verbose = false)
106  : port(_port),
107  hostAddr(determineHost()),
108  pUuid(_pUuid),
109  silenceInterval(kDefSilenceInterval),
110  activityInterval(kDefActivityInterval),
111  heartbeatInterval(kDefHeartbeatInterval),
112  connectionCb(nullptr),
113  disconnectionCb(nullptr),
114  verbose(_verbose),
115  initialized(false),
116  numHeartbeatsUninitialized(0),
117  exit(false),
118  enabled(false)
119  {
120  std::string ignIp;
121  if (env("IGN_IP", ignIp) && !ignIp.empty())
122  this->hostInterfaces = {ignIp};
123  else
124  {
125  // Get the list of network interfaces in this host.
126  this->hostInterfaces = determineInterfaces();
127  }
128 
129 #ifdef _WIN32
130  WORD wVersionRequested;
131  WSADATA wsaData;
132 
133  // Request WinSock v2.2.
134  wVersionRequested = MAKEWORD(2, 2);
135  // Load WinSock DLL.
136  if (WSAStartup(wVersionRequested, &wsaData) != 0)
137  {
138  std::cerr << "Unable to load WinSock DLL" << std::endl;
139  return;
140  }
141 #endif
142  for (const auto &netIface : this->hostInterfaces)
143  {
144  auto succeed = this->RegisterNetIface(netIface);
145 
146  // If the IP address that we're selecting as the main IP address of
147  // the host is invalid, we change it to 127.0.0.1 .
148  // This is probably because IGN_IP is set to a wrong value.
149  if (netIface == this->hostAddr && !succeed)
150  {
151  this->RegisterNetIface("127.0.0.1");
152  std::cerr << "Did you set the environment variable IGN_IP with a "
153  << "correct IP address? " << std::endl
154  << " [" << netIface << "] seems an invalid local IP "
155  << "address." << std::endl
156  << " Using 127.0.0.1 as hostname." << std::endl;
157  this->hostAddr = "127.0.0.1";
158  }
159  }
160 
161  // Socket option: SO_REUSEADDR. This options is used only for receiving
162  // data. We can reuse the same socket for receiving multicast data from
163  // multiple interfaces. We will use the socket at position 0 for
164  // receiving data.
165  int reuseAddr = 1;
166  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
167  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
168  {
169  std::cerr << "Error setting socket option (SO_REUSEADDR)."
170  << std::endl;
171  return;
172  }
173 
174 #ifdef SO_REUSEPORT
175  // Socket option: SO_REUSEPORT. This options is used only for receiving
176  // data. We can reuse the same socket for receiving multicast data from
177  // multiple interfaces. We will use the socket at position 0 for
178  // receiving data.
179  int reusePort = 1;
180  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
181  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
182  {
183  std::cerr << "Error setting socket option (SO_REUSEPORT)."
184  << std::endl;
185  return;
186  }
187 #endif
188  // Bind the first socket to the discovery port.
189  sockaddr_in localAddr;
190  memset(&localAddr, 0, sizeof(localAddr));
191  localAddr.sin_family = AF_INET;
192  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
193  localAddr.sin_port = htons(static_cast<u_short>(this->port));
194 
195  if (bind(this->sockets.at(0),
196  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
197  {
198  std::cerr << "Binding to a local port failed." << std::endl;
199  return;
200  }
201 
202  // Set 'mcastAddr' to the multicast discovery group.
203  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
204  this->mcastAddr.sin_family = AF_INET;
205  this->mcastAddr.sin_addr.s_addr =
206  inet_addr(this->kMulticastGroup.c_str());
207  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
208 
209  if (this->verbose)
210  this->PrintCurrentState();
211  }
212 
214  public: virtual ~Discovery()
215  {
216  // Tell the service thread to terminate.
217  this->exitMutex.lock();
218  this->exit = true;
219  this->exitMutex.unlock();
220 
221  // Wait for the service threads to finish before exit.
222  if (this->threadReception.joinable())
223  this->threadReception.join();
224 
225  // Broadcast a BYE message to trigger the remote cancellation of
226  // all our advertised topics.
227  this->SendMsg(ByeType,
228  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
229 
230  // Close sockets.
231  for (const auto &sock : this->sockets)
232  {
233 #ifdef _WIN32
234  closesocket(sock);
235  WSACleanup();
236 #else
237  close(sock);
238 #endif
239  }
240  }
241 
245  public: void Start()
246  {
247  {
248  std::lock_guard<std::mutex> lock(this->mutex);
249 
250  // The service is already running.
251  if (this->enabled)
252  return;
253 
254  this->enabled = true;
255  }
256 
257  auto now = std::chrono::steady_clock::now();
258  this->timeNextHeartbeat = now;
259  this->timeNextActivity = now;
260 
261  // Start the thread that receives discovery information.
262  this->threadReception = std::thread(&Discovery::RecvMessages, this);
263  }
264 
269  public: bool Advertise(const Pub &_publisher)
270  {
271  {
272  std::lock_guard<std::mutex> lock(this->mutex);
273 
274  if (!this->enabled)
275  return false;
276 
277  // Add the addressing information (local publisher).
278  if (!this->info.AddPublisher(_publisher))
279  return false;
280  }
281 
282  // Only advertise a message outside this process if the scope
283  // is not 'Process'
284  if (_publisher.Options().Scope() != Scope_t::PROCESS)
285  this->SendMsg(AdvType, _publisher);
286 
287  return true;
288  }
289 
300  public: bool Discover(const std::string &_topic) const
301  {
303  bool found;
305 
306  {
307  std::lock_guard<std::mutex> lock(this->mutex);
308 
309  if (!this->enabled)
310  return false;
311 
312  cb = this->connectionCb;
313  }
314 
315  Pub pub;
316  pub.SetTopic(_topic);
317  pub.SetPUuid(this->pUuid);
318 
319  // Send a discovery request.
320  this->SendMsg(SubType, pub);
321 
322  {
323  std::lock_guard<std::mutex> lock(this->mutex);
324  found = this->info.Publishers(_topic, addresses);
325  }
326 
327  if (found)
328  {
329  // I already have information about this topic.
330  for (const auto &proc : addresses)
331  {
332  for (const auto &node : proc.second)
333  {
334  if (cb)
335  {
336  // Execute the user's callback for a service request. Notice
337  // that we only execute one callback for preventing receive
338  // multiple service responses for a single request.
339  cb(node);
340  }
341  }
342  }
343  }
344 
345  return true;
346  }
347 
350  public: const TopicStorage<Pub> &Info() const
351  {
352  std::lock_guard<std::mutex> lock(this->mutex);
353  return this->info;
354  }
355 
360  public: bool Publishers(const std::string &_topic,
361  Addresses_M<Pub> &_publishers) const
362  {
363  std::lock_guard<std::mutex> lock(this->mutex);
364  return this->info.Publishers(_topic, _publishers);
365  }
366 
374  public: bool Unadvertise(const std::string &_topic,
375  const std::string &_nUuid)
376  {
377  Pub inf;
378  {
379  std::lock_guard<std::mutex> lock(this->mutex);
380 
381  if (!this->enabled)
382  return false;
383 
384  // Don't do anything if the topic is not advertised by any of my nodes
385  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
386  return true;
387 
388  // Remove the topic information.
389  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
390  }
391 
392  // Only unadvertise a message outside this process if the scope
393  // is not 'Process'.
394  if (inf.Options().Scope() != Scope_t::PROCESS)
395  this->SendMsg(UnadvType, inf);
396 
397  return true;
398  }
399 
402  public: std::string HostAddr() const
403  {
404  std::lock_guard<std::mutex> lock(this->mutex);
405  return this->hostAddr;
406  }
407 
412  public: unsigned int ActivityInterval() const
413  {
414  std::lock_guard<std::mutex> lock(this->mutex);
415  return this->activityInterval;
416  }
417 
423  public: unsigned int HeartbeatInterval() const
424  {
425  std::lock_guard<std::mutex> lock(this->mutex);
426  return this->heartbeatInterval;
427  }
428 
433  public: unsigned int SilenceInterval() const
434  {
435  std::lock_guard<std::mutex> lock(this->mutex);
436  return this->silenceInterval;
437  }
438 
442  public: void SetActivityInterval(const unsigned int _ms)
443  {
444  std::lock_guard<std::mutex> lock(this->mutex);
445  this->activityInterval = _ms;
446  }
447 
451  public: void SetHeartbeatInterval(const unsigned int _ms)
452  {
453  std::lock_guard<std::mutex> lock(this->mutex);
454  this->heartbeatInterval = _ms;
455  }
456 
460  public: void SetSilenceInterval(const unsigned int _ms)
461  {
462  std::lock_guard<std::mutex> lock(this->mutex);
463  this->silenceInterval = _ms;
464  }
465 
471  {
472  std::lock_guard<std::mutex> lock(this->mutex);
473  this->connectionCb = _cb;
474  }
475 
481  {
482  std::lock_guard<std::mutex> lock(this->mutex);
483  this->disconnectionCb = _cb;
484  }
485 
487  public: void PrintCurrentState() const
488  {
489  std::lock_guard<std::mutex> lock(this->mutex);
490 
491  std::cout << "---------------" << std::endl;
492  std::cout << std::boolalpha << "Enabled: "
493  << this->enabled << std::endl;
494  std::cout << "Discovery state" << std::endl;
495  std::cout << "\tUUID: " << this->pUuid << std::endl;
496  std::cout << "Settings" << std::endl;
497  std::cout << "\tActivity: " << this->activityInterval
498  << " ms." << std::endl;
499  std::cout << "\tHeartbeat: " << this->heartbeatInterval
500  << "ms." << std::endl;
501  std::cout << "\tSilence: " << this->silenceInterval
502  << " ms." << std::endl;
503  std::cout << "Known information:" << std::endl;
504  this->info.Print();
505 
506  // Used to calculate the elapsed time.
508 
509  std::cout << "Activity" << std::endl;
510  if (this->activity.empty())
511  std::cout << "\t<empty>" << std::endl;
512  else
513  {
514  for (auto &proc : this->activity)
515  {
516  // Elapsed time since the last update from this publisher.
517  std::chrono::duration<double> elapsed = now - proc.second;
518 
519  std::cout << "\t" << proc.first << std::endl;
520  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
521  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
522  << std::endl;
523  }
524  }
525  std::cout << "---------------" << std::endl;
526  }
527 
530  public: void TopicList(std::vector<std::string> &_topics) const
531  {
532  this->WaitForInit();
533  std::lock_guard<std::mutex> lock(this->mutex);
534  this->info.TopicList(_topics);
535  }
536 
539  public: void WaitForInit() const
540  {
541  std::unique_lock<std::mutex> lk(this->mutex);
542 
543  if (!this->initialized)
544  {
545  this->initializedCv.wait(lk, [this]{return this->initialized;});
546  }
547  }
548 
552  private: void UpdateActivity()
553  {
555 
556  std::lock_guard<std::mutex> lock(this->mutex);
557 
558  if (now < this->timeNextActivity)
559  return;
560 
561  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
562  {
563  // Elapsed time since the last update from this publisher.
564  auto elapsed = now - it->second;
565 
566  // This publisher has expired.
567  if (std::chrono::duration_cast<std::chrono::milliseconds>
568  (elapsed).count() > this->silenceInterval)
569  {
570  // Remove all the info entries for this process UUID.
571  this->info.DelPublishersByProc(it->first);
572 
573  // Notify without topic information. This is useful to inform the
574  // client that a remote node is gone, even if we were not
575  // interested in its topics.
576  Pub publisher;
577  publisher.SetPUuid(it->first);
578  this->disconnectionCb(publisher);
579 
580  // Remove the activity entry.
581  this->activity.erase(it++);
582  }
583  else
584  ++it;
585  }
586 
587  this->timeNextActivity = std::chrono::steady_clock::now() +
588  std::chrono::milliseconds(this->activityInterval);
589  }
590 
592  private: void UpdateHeartbeat()
593  {
595 
596  {
597  std::lock_guard<std::mutex> lock(this->mutex);
598 
599  if (now < this->timeNextHeartbeat)
600  return;
601  }
602 
603  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
604  this->SendMsg(HeartbeatType, pub);
605 
607  {
608  std::lock_guard<std::mutex> lock(this->mutex);
609 
610  // Re-advertise topics that are advertised inside this process.
611  this->info.PublishersByProc(this->pUuid, nodes);
612  }
613 
614  for (const auto &topic : nodes)
615  {
616  for (const auto &node : topic.second)
617  this->SendMsg(AdvType, node);
618  }
619 
620  {
621  std::lock_guard<std::mutex> lock(this->mutex);
622  if (!this->initialized)
623  {
624  ++this->numHeartbeatsUninitialized;
625  if (this->numHeartbeatsUninitialized == 2)
626  {
627  // We consider the discovery initialized after two cycles of
628  // heartbeats sent.
629  this->initialized = true;
630 
631  // Notify anyone waiting for the initialization phase to finish.
632  this->initializedCv.notify_all();
633  }
634  }
635 
636  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
637  std::chrono::milliseconds(this->heartbeatInterval);
638  }
639  }
640 
650  private: int NextTimeout() const
651  {
652  auto now = std::chrono::steady_clock::now();
653  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
654  auto timeUntilNextActivity = this->timeNextActivity - now;
655 
656  int t = static_cast<int>(
657  std::chrono::duration_cast<std::chrono::milliseconds>
658  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
659  int t2 = std::min(t, this->kTimeout);
660  return std::max(t2, 0);
661  }
662 
664  private: void RecvMessages()
665  {
666  bool timeToExit = false;
667  while (!timeToExit)
668  {
669  // Calculate the timeout.
670  int timeout = this->NextTimeout();
671 
672  if (pollSockets(this->sockets, timeout))
673  {
674  this->RecvDiscoveryUpdate();
675 
676  if (this->verbose)
677  this->PrintCurrentState();
678  }
679 
680  this->UpdateHeartbeat();
681  this->UpdateActivity();
682 
683  // Is it time to exit?
684  {
685  std::lock_guard<std::mutex> lock(this->exitMutex);
686  if (this->exit)
687  timeToExit = true;
688  }
689  }
690  }
691 
693  private: void RecvDiscoveryUpdate()
694  {
695  char rcvStr[Discovery::kMaxRcvStr];
696  std::string srcAddr;
697  uint16_t srcPort;
698  sockaddr_in clntAddr;
699  socklen_t addrLen = sizeof(clntAddr);
700 
701  if ((recvfrom(this->sockets.at(0),
702  reinterpret_cast<raw_type *>(rcvStr),
703  this->kMaxRcvStr, 0,
704  reinterpret_cast<sockaddr *>(&clntAddr),
705  reinterpret_cast<socklen_t *>(&addrLen))) < 0)
706  {
707  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
708  << std::endl;
709  return;
710  }
711  srcAddr = inet_ntoa(clntAddr.sin_addr);
712  srcPort = ntohs(clntAddr.sin_port);
713 
714  if (this->verbose)
715  {
716  std::cout << "\nReceived discovery update from " << srcAddr << ": "
717  << srcPort << std::endl;
718  }
719 
720  this->DispatchDiscoveryMsg(srcAddr, rcvStr);
721  }
722 
723 
727  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
728  char *_msg)
729  {
730  Header header;
731  char *pBody = _msg;
732 
733  // Create the header from the raw bytes.
734  header.Unpack(_msg);
735  pBody += header.HeaderLength();
736 
737  // Discard the message if the wire protocol is different than mine.
738  if (this->kWireVersion != header.Version())
739  return;
740 
741  auto recvPUuid = header.PUuid();
742 
743  // Discard our own discovery messages.
744  if (recvPUuid == this->pUuid)
745  return;
746 
747  // Update timestamp and cache the callbacks.
748  DiscoveryCallback<Pub> connectCb;
749  DiscoveryCallback<Pub> disconnectCb;
750  {
751  std::lock_guard<std::mutex> lock(this->mutex);
752  this->activity[recvPUuid] = std::chrono::steady_clock::now();
753  connectCb = this->connectionCb;
754  disconnectCb = this->disconnectionCb;
755  }
756 
757  switch (header.Type())
758  {
759  case AdvType:
760  {
761  // Read the rest of the fields.
762  transport::AdvertiseMessage<Pub> advMsg;
763  advMsg.Unpack(pBody);
764 
765  // Check scope of the topic.
766  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
767  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
768  _fromIp != this->hostAddr))
769  {
770  return;
771  }
772 
773  // Register an advertised address for the topic.
774  bool added;
775  {
776  std::lock_guard<std::mutex> lock(this->mutex);
777  added = this->info.AddPublisher(advMsg.Publisher());
778  }
779 
780  if (added && connectCb)
781  {
782  // Execute the client's callback.
783  connectCb(advMsg.Publisher());
784  }
785 
786  break;
787  }
788  case SubType:
789  {
790  // Read the rest of the fields.
791  SubscriptionMsg subMsg;
792  subMsg.Unpack(pBody);
793  auto recvTopic = subMsg.Topic();
794 
795  // Check if at least one of my nodes advertises the topic requested.
796  Addresses_M<Pub> addresses;
797  {
798  std::lock_guard<std::mutex> lock(this->mutex);
799  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
800  {
801  break;
802  }
803 
804  if (!this->info.Publishers(recvTopic, addresses))
805  break;
806  }
807 
808  for (const auto &nodeInfo : addresses[this->pUuid])
809  {
810  // Check scope of the topic.
811  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
812  (nodeInfo.Options().Scope() == Scope_t::HOST &&
813  _fromIp != this->hostAddr))
814  {
815  continue;
816  }
817 
818  // Answer an ADVERTISE message.
819  this->SendMsg(AdvType, nodeInfo);
820  }
821 
822  break;
823  }
824  case HeartbeatType:
825  {
826  // The timestamp has already been updated.
827  break;
828  }
829  case ByeType:
830  {
831  // Remove the activity entry for this publisher.
832  {
833  std::lock_guard<std::mutex> lock(this->mutex);
834  this->activity.erase(recvPUuid);
835  }
836 
837  if (disconnectCb)
838  {
839  Pub pub;
840  pub.SetPUuid(recvPUuid);
841  // Notify the new disconnection.
842  disconnectCb(pub);
843  }
844 
845  // Remove the address entry for this topic.
846  {
847  std::lock_guard<std::mutex> lock(this->mutex);
848  this->info.DelPublishersByProc(recvPUuid);
849  }
850 
851  break;
852  }
853  case UnadvType:
854  {
855  // Read the address.
856  transport::AdvertiseMessage<Pub> advMsg;
857  advMsg.Unpack(pBody);
858 
859  // Check scope of the topic.
860  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
861  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
862  _fromIp != this->hostAddr))
863  {
864  return;
865  }
866 
867  if (disconnectCb)
868  {
869  // Notify the new disconnection.
870  disconnectCb(advMsg.Publisher());
871  }
872 
873  // Remove the address entry for this topic.
874  {
875  std::lock_guard<std::mutex> lock(this->mutex);
876  this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
877  advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
878  }
879 
880  break;
881  }
882  default:
883  {
884  std::cerr << "Unknown message type [" << header.Type() << "]\n";
885  break;
886  }
887  }
888  }
889 
896  private: template<typename T>
897  void SendMsg(const uint8_t _type,
898  const T &_pub,
899  const uint16_t _flags = 0) const
900  {
901  // Create the header.
902  Header header(this->Version(), _pub.PUuid(), _type, _flags);
903  auto msgLength = 0;
904  std::vector<char> buffer;
905 
906  std::string topic = _pub.Topic();
907 
908  switch (_type)
909  {
910  case AdvType:
911  case UnadvType:
912  {
913  // Create the [UN]ADVERTISE message.
914  transport::AdvertiseMessage<T> advMsg(header, _pub);
915 
916  // Allocate a buffer and serialize the message.
917  buffer.resize(advMsg.MsgLength());
918  advMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
919  msgLength = static_cast<int>(advMsg.MsgLength());
920  break;
921  }
922  case SubType:
923  {
924  // Create the [UN]SUBSCRIBE message.
925  SubscriptionMsg subMsg(header, topic);
926 
927  // Allocate a buffer and serialize the message.
928  buffer.resize(subMsg.MsgLength());
929  subMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
930  msgLength = static_cast<int>(subMsg.MsgLength());
931  break;
932  }
933  case HeartbeatType:
934  case ByeType:
935  {
936  // Allocate a buffer and serialize the message.
937  buffer.resize(header.HeaderLength());
938  header.Pack(reinterpret_cast<char*>(&buffer[0]));
939  msgLength = header.HeaderLength();
940  break;
941  }
942  default:
943  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
944  << " type [" << _type << "]" << std::endl;
945  return;
946  }
947 
948  // Send the discovery message to the multicast group through all the
949  // sockets.
950  for (const auto &sock : this->Sockets())
951  {
952  if (sendto(sock, reinterpret_cast<const raw_type *>(
953  reinterpret_cast<unsigned char*>(&buffer[0])),
954  msgLength, 0,
955  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
956  sizeof(*(this->MulticastAddr()))) != msgLength)
957  {
958  std::cerr << "Exception sending a message" << std::endl;
959  return;
960  }
961  }
962 
963  if (this->Verbose())
964  {
965  std::cout << "\t* Sending " << MsgTypesStr[_type]
966  << " msg [" << topic << "]" << std::endl;
967  }
968  }
969 
972  private: const std::vector<int> &Sockets() const
973  {
974  return this->sockets;
975  }
976 
979  private: const sockaddr_in *MulticastAddr() const
980  {
981  return &this->mcastAddr;
982  }
983 
986  private: bool Verbose() const
987  {
988  return this->verbose;
989  }
990 
993  private: uint8_t Version() const
994  {
995  return this->kWireVersion;
996  }
997 
1002  private: bool RegisterNetIface(const std::string &_ip)
1003  {
1004  // Make a new socket for sending discovery information.
1005  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1006  if (sock < 0)
1007  {
1008  std::cerr << "Socket creation failed." << std::endl;
1009  return false;
1010  }
1011 
1012  // Socket option: IP_MULTICAST_IF.
1013  // This socket option needs to be applied to each socket used to send
1014  // data. This option selects the source interface for outgoing messages.
1015  struct in_addr ifAddr;
1016  ifAddr.s_addr = inet_addr(_ip.c_str());
1017  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1018  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1019  {
1020  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1021  << std::endl;
1022  return false;
1023  }
1024 
1025  this->sockets.push_back(sock);
1026 
1027  // Join the multicast group. We have to do it for each network interface
1028  // but we can do it on the same socket. We will use the socket at
1029  // position 0 for receiving multicast information.
1030  struct ip_mreq group;
1031  group.imr_multiaddr.s_addr =
1032  inet_addr(this->kMulticastGroup.c_str());
1033  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1034  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1035  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1036  {
1037  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1038  << std::endl;
1039  return false;
1040  }
1041 
1042  return true;
1043  }
1044 
1048  private: static const unsigned int kDefActivityInterval = 100;
1049 
1053  private: static const unsigned int kDefHeartbeatInterval = 1000;
1054 
1058  private: static const unsigned int kDefSilenceInterval = 3000;
1059 
1061  private: const std::string kMulticastGroup = "224.0.0.7";
1062 
1064  private: const int kTimeout = 250;
1065 
1067  private: static const int kMaxRcvStr = 65536;
1068 
1071  private: static const uint8_t kWireVersion = 8;
1072 
1074  private: int port;
1075 
1077  private: std::string hostAddr;
1078 
1080  private: std::vector<std::string> hostInterfaces;
1081 
1083  private: std::string pUuid;
1084 
1088  private: unsigned int silenceInterval;
1089 
1093  private: unsigned int activityInterval;
1094 
1098  private: unsigned int heartbeatInterval;
1099 
1101  private: DiscoveryCallback<Pub> connectionCb;
1102 
1104  private: DiscoveryCallback<Pub> disconnectionCb;
1105 
1107  private: TopicStorage<Pub> info;
1108 
1114 
1116  private: bool verbose;
1117 
1119  private: std::vector<int> sockets;
1120 
1122  private: sockaddr_in mcastAddr;
1123 
1125  private: mutable std::mutex mutex;
1126 
1128  private: std::thread threadReception;
1129 
1131  private: Timestamp timeNextHeartbeat;
1132 
1134  private: Timestamp timeNextActivity;
1135 
1137  private: std::mutex exitMutex;
1138 
1143  private: bool initialized;
1144 
1146  private: unsigned int numHeartbeatsUninitialized;
1147 
1149  private: mutable std::condition_variable initializedCv;
1150 
1152  private: bool exit;
1153 
1155  private: bool enabled;
1156  };
1157 
1161 
1165  }
1166 }
1167 
1168 #endif
void raw_type
Definition: Discovery.hh:43
T at(T... args)
T cbegin(T... args)
T boolalpha(T... args)
T c_str(T... args)
A class that is used to store information about an advertised publisher. An instance of this class is...
Definition: Node.hh:521
A class for customizing the publication options for a topic or service advertised....
Definition: AdvertiseOptions.hh:55
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:97
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:539
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:245
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:269
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition: Discovery.hh:412
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition: Discovery.hh:423
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:350
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:402
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1113
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:374
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:451
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:433
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:487
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:460
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:360
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition: Discovery.hh:470
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:300
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:442
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition: Discovery.hh:480
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:214
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:530
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:103
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition: Publisher.hh:38
bool Publishers(const std::string &_topic, std::map< std::string, std::vector< T >> &_info) const
Get the map of publishers stored for a given topic.
Definition: TopicStorage.hh:203
bool HasAnyPublishers(const std::string &_topic, const std::string &_pUuid) const
Return if there is any publisher stored for the given topic and process UUID.
Definition: TopicStorage.hh:130
void PublishersByProc(const std::string &_pUuid, std::map< std::string, std::vector< T >> &_pubs) const
Given a process UUID, the function returns the list of publishers contained in this process UUID with...
Definition: TopicStorage.hh:282
bool DelPublisherByNode(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid)
Remove a publisher associated to a given topic and UUID pair.
Definition: TopicStorage.hh:218
bool Publisher(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid, T &_publisher) const
Get the address information for a given topic and node UUID.
Definition: TopicStorage.hh:165
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition: TopicStorage.hh:258
bool AddPublisher(const T &_publisher)
Add a new address associated to a given topic and node UUID.
Definition: TopicStorage.hh:49
void Print() const
Print all the information for debugging purposes.
Definition: TopicStorage.hh:342
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently stored.
Definition: TopicStorage.hh:335
T duration_cast(T... args)
T empty(T... args)
T endl(T... args)
T erase(T... args)
T join(T... args)
T joinable(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
SrvAddresses_M addresses
Definition: Node.hh:986
static const uint8_t ByeType
Definition: Packet.hh:39
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:151
static const uint8_t UnadvType
Definition: Packet.hh:37
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(* _cb)(const RequestT &_req)
Definition: Node.hh:527
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
static const uint8_t SubType
Definition: Packet.hh:36
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github....
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:44
static const uint8_t HeartbeatType
Definition: Packet.hh:38
cb(_internalRep, _internalResult)
static const uint8_t AdvType
Definition: Packet.hh:35
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(*) const AdvertiseServiceOptions ReplyT const std::string _topic)
Definition: Node.hh:558
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
std::unique_lock< std::recursive_mutex > lk(this->Shared() ->mutex)
Definition: AdvertiseOptions.hh:28
T push_back(T... args)
T resize(T... args)
T unlock(T... args)