Fawkes API  Fawkes Development Version
mongodb_log_tf_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_tf_thread.cpp - MongoDB transforms logging Thread
4  *
5  * Created: Tue Dec 11 14:58:00 2012 (Freiburg)
6  * Copyright 2010-2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_log_tf_thread.h"
23 
24 #include <core/threading/mutex_locker.h>
25 #include <plugins/mongodb/aspect/mongodb_conncreator.h>
26 #include <tf/time_cache.h>
27 #include <utils/time/wait.h>
28 
29 // from MongoDB
30 #include <mongo/client/dbclient.h>
31 
32 #include <cstdlib>
33 
34 using namespace mongo;
35 using namespace fawkes;
36 
37 /** @class MongoLogTransformsThread "mongodb_log_tf_thread.h"
38  * MongoDB transforms logging thread.
39  * This thread periodically queries the transformer for recent transforms
40  * and stores them to the database.
41  *
42  * @author Tim Niemueller
43  */
44 
45 /** Constructor. */
47 : Thread("MongoLogTransformsThread", Thread::OPMODE_CONTINUOUS),
48  MongoDBAspect("default"),
49  TransformAspect(TransformAspect::ONLY_LISTENER)
50 {
52 }
53 
54 /** Destructor. */
56 {
57 }
58 
59 void
61 {
62  database_ = "fflog";
63  collection_ = "tf";
64  try {
65  database_ = config->get_string("/plugins/mongodb-log/database");
66  } catch (Exception &e) {
67  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
68  }
69 
70  try {
71  collection_ = config->get_string("/plugins/mongodb-log/transforms/collection");
72  } catch (Exception &e) {
73  logger->log_info(name(), "No transforms collection configured, using %s", collection_.c_str());
74  }
75  collection_ = database_ + "." + collection_;
76 
77  cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/transforms/storage-interval");
78 
79  if (cfg_storage_interval_ <= 0.) {
80  // 50% of the cache time, assume 50% is enough to store the data
81  cfg_storage_interval_ = tf_listener->get_cache_time() * 0.5;
82  }
83 
84  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
85  mutex_ = new Mutex();
86 }
87 
88 bool
90 {
91  mutex_->lock();
92  return true;
93 }
94 
95 void
97 {
98  delete wait_;
99  delete mutex_;
100 }
101 
102 void
104 {
105  mutex_->lock();
106  fawkes::Time loop_start(clock);
107  wait_->mark_start();
108  std::vector<fawkes::Time> tf_range_start;
109  std::vector<fawkes::Time> tf_range_end;
110  ;
111 
112  tf_listener->lock();
113  std::vector<tf::TimeCacheInterfacePtr> caches = tf_listener->get_frame_caches();
114  std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
115 
116  const size_t n_caches = caches.size();
117  tf_range_start.resize(n_caches, fawkes::Time(0, 0));
118  tf_range_end.resize(n_caches, fawkes::Time(0, 0));
119  if (last_tf_range_end_.size() != n_caches) {
120  last_tf_range_end_.resize(n_caches, fawkes::Time(0, 0));
121  }
122 
123  unsigned int num_transforms = 0;
124  unsigned int num_upd_caches = 0;
125 
126  for (size_t i = 0; i < n_caches; ++i) {
127  if (caches[i]) {
128  tf_range_end[i] = caches[i]->get_latest_timestamp();
129  if (last_tf_range_end_[i] != tf_range_end[i]) {
130  // we have new data
131  if (!tf_range_end[i].is_zero()) {
132  tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
133  if (last_tf_range_end_[i] > tf_range_start[i]) {
134  tf_range_start[i] = last_tf_range_end_[i];
135  }
136  }
137  copies[i] = caches[i]->clone(tf_range_start[i]);
138  last_tf_range_end_[i] = tf_range_end[i];
139  num_upd_caches += 1;
140  num_transforms += copies[i]->get_list_length();
141  }
142  }
143  }
144  tf_listener->unlock();
145 
146  store(copies, tf_range_start, tf_range_end);
147 
148  mutex_->unlock();
149  // -1 to subtract "NO PARENT" pseudo cache
150  fawkes::Time loop_end(clock);
151  logger->log_debug(name(),
152  "%u transforms for %u updated frames stored in %.1f ms",
153  num_transforms,
154  num_upd_caches,
155  (loop_end - &loop_start) * 1000.);
156  wait_->wait();
157 }
158 
159 void
160 MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
161  std::vector<fawkes::Time> & from,
162  std::vector<fawkes::Time> & to)
163 {
164  std::vector<std::string> frame_map = tf_listener->get_frame_id_mappings();
165 
166  for (size_t i = 0; i < caches.size(); ++i) {
167  tf::TimeCacheInterfacePtr tc = caches[i];
168  if (!tc)
169  continue;
170 
171  BSONObjBuilder document;
172  document.append("timestamp", (long long)from[i].in_msec());
173  document.append("timestamp_from", (long long)from[i].in_msec());
174  document.append("timestamp_to", (long long)to[i].in_msec());
175  const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
176 
177  if (storage.empty()) {
178  /*
179  fawkes::Time now(clock);
180  logger->log_warn(name(), "Empty storage for %s (start: %lu end: %lu now: %lu",
181  frame_map[i].c_str(), from[i].in_msec(), to[i].in_msec(),
182  now.in_msec());
183  */
184  continue;
185  }
186 
187  document.append("frame", frame_map[storage.front().frame_id]);
188  document.append("child_frame", frame_map[storage.front().child_frame_id]);
189 
190  BSONArrayBuilder tfl_array(document.subarrayStart("transforms"));
191  /*
192  logger->log_debug(name(), "Writing %zu transforms for child frame %s",
193  storage.size(), frame_map[i].c_str());
194  */
195 
196  tf::TimeCache::L_TransformStorage::const_iterator s;
197  for (s = storage.begin(); s != storage.end(); ++s) {
198  BSONObjBuilder tf_doc(tfl_array.subobjStart());
199 
200  /*
201  "frame" : "/bl_caster_rotation_link",
202  "child_frame" : "/bl_caster_l_wheel_link",
203  "translation" : [
204  0,
205  0.049,
206  0
207  ],
208  "rotation" : [
209  0,
210  0.607301261865804,
211  0,
212  0.7944716340664417
213  ]
214  */
215 
216  tf_doc.append("timestamp", (long long)s->stamp.in_msec());
217  tf_doc.append("frame", frame_map[s->frame_id]);
218  tf_doc.append("child_frame", frame_map[s->child_frame_id]);
219 
220  BSONArrayBuilder rot_arr(tf_doc.subarrayStart("rotation"));
221  rot_arr.append(s->rotation.x());
222  rot_arr.append(s->rotation.y());
223  rot_arr.append(s->rotation.z());
224  rot_arr.append(s->rotation.w());
225  rot_arr.doneFast();
226 
227  BSONArrayBuilder trans_arr(tf_doc.subarrayStart("translation"));
228  trans_arr.append(s->translation.x());
229  trans_arr.append(s->translation.y());
230  trans_arr.append(s->translation.z());
231  trans_arr.doneFast();
232 
233  tf_doc.doneFast();
234  }
235 
236  tfl_array.doneFast();
237 
238  try {
239  mongodb_client->insert(collection_, document.obj());
240  } catch (mongo::DBException &e) {
241  logger->log_warn(name(), "Inserting TF failed: %s", e.what());
242 
243  } catch (std::exception &e) {
244  // for old and broken compilers
245  logger->log_warn(name(), "Inserting TF failed: %s (*)", e.what());
246  }
247  }
248 }
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:78
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
std::vector< std::string > get_frame_id_mappings() const
Get mappings from frame ID to names.
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:55
A class for handling time.
Definition: time.h:92
virtual void init()
Initialize the thread.
void lock()
Lock transformer.
Thread class encapsulation of pthreads.
Definition: thread.h:45
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access the transform system.
Definition: tf.h:38
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
float get_cache_time() const
Get cache time.
virtual void finalize()
Finalize the thread.
Base class for exceptions in Fawkes.
Definition: exception.h:35
Thread aspect to access MongoDB.
Definition: mongodb.h:39
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void loop()
Code to execute in the thread.
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
void lock()
Lock this mutex.
Definition: mutex.cpp:87
virtual ~MongoLogTransformsThread()
Destructor.
Mutex mutual exclusion lock.
Definition: mutex.h:32
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
std::vector< TimeCacheInterfacePtr > get_frame_caches() const
Get all currently existing caches.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.