Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <map>
25 #include <list>
26 #include <string>
27 #include <vector>
28 
31 #include "gazebo/util/system.hh"
32 
33 namespace gazebo
34 {
35  namespace transport
36  {
39  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
40  {
44  public: PublishTask(transport::PublisherPtr _pub,
45  const google::protobuf::Message &_message)
46  : pub(_pub)
47  {
48  this->msg = _message.New();
49  this->msg->CopyFrom(_message);
50  }
51 
54  public: tbb::task *execute()
55  {
56  this->pub->WaitForConnection();
57  this->pub->Publish(*this->msg, true);
58  this->pub->SendMessage();
59  delete this->msg;
60  this->pub.reset();
61  return NULL;
62  }
63 
65  private: transport::PublisherPtr pub;
66 
68  private: google::protobuf::Message *msg;
69  };
71 
74 
78  class GZ_TRANSPORT_VISIBLE Node :
79  public boost::enable_shared_from_this<Node>
80  {
82  public: Node();
83 
85  public: virtual ~Node();
86 
95  public: void Init(const std::string &_space ="");
96 
108  public: bool TryInit(
109  const common::Time &_maxWait = common::Time(1, 0));
110 
115  public: bool IsInitialized() const;
116 
118  public: void Fini();
119 
122  public: std::string GetTopicNamespace() const;
123 
127  public: std::string DecodeTopicName(const std::string &_topic);
128 
132  public: std::string EncodeTopicName(const std::string &_topic);
133 
136  public: unsigned int GetId() const;
137 
140  public: void ProcessPublishers();
141 
143  public: void ProcessIncoming();
144 
148  public: bool HasLatchedSubscriber(const std::string &_topic) const;
149 
150 
157  public: template<typename M>
158  void Publish(const std::string &_topic,
159  const google::protobuf::Message &_message)
160  {
161  transport::PublisherPtr pub = this->Advertise<M>(_topic);
162  PublishTask *task = new(tbb::task::allocate_root())
163  PublishTask(pub, _message);
164 
165  tbb::task::enqueue(*task);
166  return;
167  }
168 
176  public: template<typename M>
177  transport::PublisherPtr Advertise(const std::string &_topic,
178  unsigned int _queueLimit = 1000,
179  double _hzRate = 0)
180  {
181  std::string decodedTopic = this->DecodeTopicName(_topic);
182  PublisherPtr publisher =
183  transport::TopicManager::Instance()->Advertise<M>(
184  decodedTopic, _queueLimit, _hzRate);
185 
186  boost::mutex::scoped_lock lock(this->publisherMutex);
187  publisher->SetNode(shared_from_this());
188  this->publishers.push_back(publisher);
189 
190  return publisher;
191  }
192 
200  public: template<typename M, typename T>
201  SubscriberPtr Subscribe(const std::string &_topic,
202  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
203  bool _latching = false)
204  {
205  SubscribeOptions ops;
206  std::string decodedTopic = this->DecodeTopicName(_topic);
207  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
208 
209  {
210  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
211  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
212  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
213  }
214 
215  SubscriberPtr result =
216  transport::TopicManager::Instance()->Subscribe(ops);
217 
218  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
219 
220  return result;
221  }
222 
229  public: template<typename M>
230  SubscriberPtr Subscribe(const std::string &_topic,
231  void(*_fp)(const boost::shared_ptr<M const> &),
232  bool _latching = false)
233  {
234  SubscribeOptions ops;
235  std::string decodedTopic = this->DecodeTopicName(_topic);
236  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
237 
238  {
239  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
240  this->callbacks[decodedTopic].push_back(
241  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
242  }
243 
244  SubscriberPtr result =
245  transport::TopicManager::Instance()->Subscribe(ops);
246 
247  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
248 
249  return result;
250  }
251 
259  template<typename T>
260  SubscriberPtr Subscribe(const std::string &_topic,
261  void(T::*_fp)(const std::string &), T *_obj,
262  bool _latching = false)
263  {
264  SubscribeOptions ops;
265  std::string decodedTopic = this->DecodeTopicName(_topic);
266  ops.Init(decodedTopic, shared_from_this(), _latching);
267 
268  {
269  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
270  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
271  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
272  }
273 
274  SubscriberPtr result =
275  transport::TopicManager::Instance()->Subscribe(ops);
276 
277  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
278 
279  return result;
280  }
281 
282 
289  SubscriberPtr Subscribe(const std::string &_topic,
290  void(*_fp)(const std::string &), bool _latching = false)
291  {
292  SubscribeOptions ops;
293  std::string decodedTopic = this->DecodeTopicName(_topic);
294  ops.Init(decodedTopic, shared_from_this(), _latching);
295 
296  {
297  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
298  this->callbacks[decodedTopic].push_back(
300  }
301 
302  SubscriberPtr result =
303  transport::TopicManager::Instance()->Subscribe(ops);
304 
305  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
306 
307  return result;
308  }
309 
314  public: bool HandleData(const std::string &_topic,
315  const std::string &_msg);
316 
321  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
322 
329  public: void InsertLatchedMsg(const std::string &_topic,
330  const std::string &_msg);
331 
338  public: void InsertLatchedMsg(const std::string &_topic,
339  MessagePtr _msg);
340 
344  public: std::string GetMsgType(const std::string &_topic) const;
345 
351  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
352 
364  private: bool PrivateInit(const std::string &_space,
365  const common::Time &_maxWait,
366  const bool _fallbackToDefault);
367 
368  private: std::string topicNamespace;
369  private: std::vector<PublisherPtr> publishers;
370  private: std::vector<PublisherPtr>::iterator publishersIter;
371  private: static unsigned int idCounter;
372  private: unsigned int id;
373 
374  private: typedef std::list<CallbackHelperPtr> Callback_L;
375  private: typedef std::map<std::string, Callback_L> Callback_M;
376  private: Callback_M callbacks;
377  private: std::map<std::string, std::list<std::string> > incomingMsgs;
378 
380  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
381 
382  private: boost::mutex publisherMutex;
383  private: boost::mutex publisherDeleteMutex;
384  private: boost::recursive_mutex incomingMutex;
385 
388  private: boost::recursive_mutex processIncomingMutex;
389 
390  private: bool initialized;
391  };
393  }
394 }
395 #endif
Options for a subscription.
Definition: SubscribeOptions.hh:35
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic.
Definition: Node.hh:177
Forward declarations for the common classes.
Definition: Animation.hh:26
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:260
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
Forward declarations for transport.
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:201
Callback helper Template.
Definition: CallbackHelper.hh:111
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:230
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
#define NULL
Definition: CommonTypes.hh:31
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:289
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
A Time class, can be used to hold wall- or sim-time.
Definition: Time.hh:44
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36