Fawkes API  Fawkes Development Version
pcl_db_pipeline.h
1 
2 /***************************************************************************
3  * pcl_db_pipeline.h - PCL DB processing pipeline base class
4  *
5  * Created: Wed Aug 21 17:24:18 2013
6  * Copyright 2012-2013 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #ifndef _PLUGINS_PERCEPTION_PCL_DB_PCL_DB_PIPELINE_H_
23 #define _PLUGINS_PERCEPTION_PCL_DB_PCL_DB_PIPELINE_H_
24 
25 #include "mongodb_tf_transformer.h"
26 
27 #include <config/config.h>
28 #include <logging/logger.h>
29 #include <pcl/point_cloud.h>
30 #include <pcl/point_types.h>
31 #include <pcl_utils/comparisons.h>
32 #include <pcl_utils/storage_adapter.h>
33 #include <pcl_utils/transforms.h>
34 #include <pcl_utils/utils.h>
35 
36 #include <Eigen/Core>
37 #ifdef USE_TIMETRACKER
38 # include <utils/time/tracker.h>
39 #endif
40 #include <utils/time/tracker_macros.h>
41 
42 #define USE_ALIGNMENT
43 #define USE_ICP_ALIGNMENT
44 // define USE_NDT_ALIGNMENT
45 
46 #define CFG_PREFIX "/perception/pcl-db/"
47 
48 #include <mongo/client/dbclient.h>
49 #include <mongo/client/gridfs.h>
50 #include <pcl/filters/approximate_voxel_grid.h>
51 #include <pcl/filters/conditional_removal.h>
52 #include <pcl/filters/extract_indices.h>
53 #include <pcl/filters/passthrough.h>
54 #include <pcl/filters/voxel_grid.h>
55 #include <pcl/point_cloud.h>
56 #include <pcl/point_types.h>
57 #include <pcl/segmentation/sac_segmentation.h>
58 #include <pcl/surface/convex_hull.h>
59 
60 #ifdef HAVE_MONGODB_VERSION_H
61 // we are using mongo-cxx-driver which renamed QUERY to MONGO_QUERY
62 # define QUERY MONGO_QUERY
63 #endif
64 
65 static const uint8_t cluster_colors[12][3] = {{176, 0, 30},
66  {0, 0, 255},
67  {255, 90, 0},
68  {137, 82, 39},
69  {56, 23, 90},
70  {99, 0, 30},
71  {255, 0, 0},
72  {0, 255, 0},
73  {255, 255, 0},
74  {255, 0, 255},
75  {0, 255, 255},
76  {27, 117, 196}};
77 
78 typedef enum { APPLICABLE = 0, TYPE_MISMATCH, NO_POINTCLOUD, QUERY_FAILED } ApplicabilityStatus;
79 
80 /** Database point cloud pipeline base class.
81  * Common functionality for pcl-db-* plugins operating on
82  * point clouds restored from MongoDB.
83  * @author Tim Niemueller
84  */
85 template <typename PointType>
87 {
88 protected:
89  /** Basic point cloud type. */
91 
92  /** Colored point type */
93  typedef pcl::PointXYZRGB ColorPointType;
94  /** Type for colored point clouds based on ColorPointType. */
96  /** Shared pointer to cloud. */
97  typedef typename Cloud::Ptr CloudPtr;
98  /** Shared pointer to constant cloud. */
99  typedef typename Cloud::ConstPtr CloudConstPtr;
100 
101  /** Shared pointer to colored cloud. */
102  typedef typename ColorCloud::Ptr ColorCloudPtr;
103  /** Shared pointer to constant colored cloud. */
104  typedef typename ColorCloud::ConstPtr ColorCloudConstPtr;
105 
106 public:
107  /** Constructor.
108  * @param mongodb_client MongoDB client
109  * @param config configuration
110  * @param logger Logger
111  * @param output output point cloud
112  */
113  PointCloudDBPipeline(mongo::DBClientBase * mongodb_client,
114  fawkes::Configuration *config,
115  fawkes::Logger * logger,
116  ColorCloudPtr output)
117  : mongodb_client_(mongodb_client), logger_(logger), output_(output)
118  {
119  name_ = "PCL_DB_Pipeline";
120 
121  cfg_pcl_age_tolerance_ = (long)round(config->get_float(CFG_PREFIX "pcl-age-tolerance") * 1000.);
122  std::vector<float> transform_range = config->get_floats(CFG_PREFIX "transform-range");
123  if (transform_range.size() != 2) {
124  throw fawkes::Exception("Transform range must be a list with exactly two elements");
125  }
126  if (transform_range[1] < transform_range[0]) {
127  throw fawkes::Exception("Transform range start cannot be smaller than end");
128  }
129  cfg_transform_range_[0] = (long)round(transform_range[0] * 1000.);
130  cfg_transform_range_[1] = (long)round(transform_range[1] * 1000.);
131  }
132 
133  /** Destructor. */
135  {
136  }
137 
138  /** Check if this pipeline instance is suitable for the given times.
139  * Retrieves information about the point clouds for the specified
140  * \p times and checks if this pipeline (depending on the template
141  * parameter) is suitable for the processing of these pipelines.
142  * @param times times for which to check the point clouds
143  * @param database ddatabase from which to retrieve the information
144  * @param collection collection from which to retrieve the information
145  * @return applicability status
146  */
147  ApplicabilityStatus
148  applicable(std::vector<long long> &times, std::string &database, std::string &collection)
149  {
150  const unsigned int num_clouds = times.size();
151 
152 #if PCL_VERSION_COMPARE(>=, 1, 7, 0)
153  std::vector<pcl::PCLPointField> pfields;
154 #else
155  std::vector<sensor_msgs::PointField> pfields;
156 #endif
157  pcl::for_each_type<typename pcl::traits::fieldList<PointType>::type>(
158  pcl::detail::FieldAdder<PointType>(pfields));
159 
160  std::string fq_collection = database + "." + collection;
161  try {
162  for (unsigned int i = 0; i < num_clouds; ++i) {
163 #if __cplusplus >= 201103L
164  std::unique_ptr<mongo::DBClientCursor> cursor =
165 #else
166  std::auto_ptr<mongo::DBClientCursor> cursor =
167 #endif
168  mongodb_client_->query(fq_collection,
169  QUERY("timestamp" << mongo::LTE << times[i] << mongo::GTE
170  << (times[i] - cfg_pcl_age_tolerance_))
171  .sort("timestamp", -1),
172  /* limit */ 1);
173 
174  if (cursor->more()) {
175  mongo::BSONObj p = cursor->next();
176  mongo::BSONObj pcldoc = p.getObjectField("pointcloud");
177  std::vector<mongo::BSONElement> fields = pcldoc["field_info"].Array();
178 
179  if (fields.size() == pfields.size()) {
180  for (unsigned int i = 0; i < pfields.size(); ++i) {
181 #if PCL_VERSION_COMPARE(>=, 1, 7, 0)
182  pcl::PCLPointField &pf = pfields[i];
183 #else
184  sensor_msgs::PointField &pf = pfields[i];
185 #endif
186 
187  bool found = false;
188  for (unsigned int j = 0; j < fields.size(); ++j) {
189  if ((fields[j]["name"].String() == pf.name)
190  && (fields[j]["offset"].Int() == (int)pf.offset)
191  && (fields[j]["datatype"].Int() == pf.datatype)
192  && (fields[j]["count"].Int() == (int)pf.count)) {
193  found = true;
194  break;
195  }
196  }
197  if (!found) {
199  "Type mismatch (fields) for pointcloud "
200  "at timestamp %lli",
201  times[i]);
202  return TYPE_MISMATCH;
203  }
204  }
205  } else {
207  "Type mismatch (num fields) for pointcloud "
208  "at timestamp %lli",
209  times[i]);
210  return TYPE_MISMATCH;
211  }
212  } else {
214  "No pointclouds for timestamp %lli in %s",
215  times[i],
216  fq_collection.c_str());
217  return NO_POINTCLOUD;
218  }
219  }
220  } catch (mongo::DBException &e) {
221  logger_->log_warn(name_, "MongoDB query failed: %s", e.what());
222  return QUERY_FAILED;
223  }
224 
225  return APPLICABLE;
226  }
227 
228 protected: // methods
229  /** Read a file from MongoDB GridFS.
230  * @param dataptr Pointer to buffer to read data to. Make sure it is of
231  * sufficient size.
232  * @param database database from which to read the file
233  * @param filename name of file to read from GridFS.
234  */
235  void
236  read_gridfs_file(void *dataptr, std::string &database, std::string filename)
237  {
238  char * tmp = (char *)dataptr;
239  mongo::GridFS gridfs(*mongodb_client_, database);
240 
241  mongo::GridFile file = gridfs.findFile(filename);
242  if (!file.exists()) {
243  logger_->log_warn(name_, "Grid file does not exist");
244  return;
245  }
246 
247  size_t bytes = 0;
248  for (int c = 0; c < file.getNumChunks(); ++c) {
249  mongo::GridFSChunk chunk = file.getChunk(c);
250  int len = 0;
251  const char * chunk_data = chunk.data(len);
252  memcpy(tmp, chunk_data, len);
253  tmp += len;
254  bytes += len;
255  //logger_->log_info(name_, "Read chunk %i of %i bytes", c, len);
256  }
257  //logger_->log_info(name_, "%zu bytes restored", bytes);
258  }
259 
260  /** Retrieve point clouds from database.
261  * @param times timestamps for when to read the point clouds. The method
262  * will retrieve the point clouds with the minimum difference between the
263  * desired and actual times.
264  * @param actual_times upon return contains the actual times of the point
265  * clouds retrieved based on the desired @p times.
266  * @param database name of the database to retrieve data from
267  * @param collection name of the collection to retrieve data from.
268  * @return vector of shared pointers to retrieved point clouds
269  */
270  std::vector<CloudPtr>
271  retrieve_clouds(std::vector<long long> &times,
272  std::vector<long long> &actual_times,
273  std::string & database,
274  std::string & collection)
275  {
276 #ifdef HAVE_MONGODB_VERSION_H
277  // mongodb-cxx-driver dropped ensureIndex and names it createIndex
278  mongodb_client_->createIndex(database + "." + collection, mongo::fromjson("{timestamp:1}"));
279 #else
280  mongodb_client_->ensureIndex(database + "." + collection, mongo::fromjson("{timestamp:1}"));
281 #endif
282 
283  const unsigned int num_clouds = times.size();
284  std::vector<CloudPtr> pcls(num_clouds);
285 
286  // retrieve point clouds
287  for (unsigned int i = 0; i < num_clouds; ++i) {
288 #if __cplusplus >= 201103L
289  std::unique_ptr<mongo::DBClientCursor> cursor =
290 #else
291  std::auto_ptr<mongo::DBClientCursor> cursor =
292 #endif
293  mongodb_client_->query(database + "." + collection,
294  QUERY("timestamp" << mongo::LTE << times[i] << mongo::GTE
295  << (times[i] - cfg_pcl_age_tolerance_))
296  .sort("timestamp", -1),
297  /* limit */ 1);
298 
299  if (cursor->more()) {
300  mongo::BSONObj p = cursor->next();
301  mongo::BSONObj pcldoc = p.getObjectField("pointcloud");
302  std::vector<mongo::BSONElement> fields = pcldoc["field_info"].Array();
303 
304  long long timestamp = p["timestamp"].Long();
305  double age = (double)(times[i] - timestamp) / 1000.;
306  logger_->log_info(name_, "Restoring point cloud at %lli with age %f sec", timestamp, age);
307 
308  // reconstruct point cloud
309  CloudPtr lpcl(new Cloud());
310  pcls[i] = lpcl;
311 
312  actual_times[i] = p["timestamp"].Number();
313  fawkes::Time actual_time((long)actual_times[i]);
314 
315  lpcl->header.frame_id = pcldoc["frame_id"].String();
316  lpcl->is_dense = pcldoc["is_dense"].Bool();
317  lpcl->width = pcldoc["width"].Int();
318  lpcl->height = pcldoc["height"].Int();
319  fawkes::pcl_utils::set_time(lpcl, actual_time);
320  lpcl->points.resize(pcldoc["num_points"].Int());
321 
322  read_gridfs_file(&lpcl->points[0],
323  database,
324  pcldoc.getFieldDotted("data.filename").String());
325  } else {
326  logger_->log_warn(name_, "Cannot retrieve document for time %lli", times[i]);
327  return std::vector<CloudPtr>();
328  }
329  }
330 
331  return pcls;
332  }
333 
334 protected: // members
335  const char *name_; /**< Name of the pipeline. */
336 
337  long cfg_pcl_age_tolerance_; /**< Age tolerance for retrieved point clouds. */
338  long cfg_transform_range_[2]; /**< Transform range start and end times. */
339 
340  mongo::DBClientBase *mongodb_client_; /**< MongoDB client to retrieve data. */
341 
342  fawkes::Logger *logger_; /**< Logger for informative messages. */
343 
344  ColorCloudPtr output_; /**< The final (colored) output of the pipeline. */
345 };
346 
347 /** Convert applicability status to readable string.
348  * @param status the status to convert
349  * @return readable string for status
350  */
351 inline const char *
352 to_string(ApplicabilityStatus status)
353 {
354  switch (status) {
355  case APPLICABLE: return "Applicable";
356  case TYPE_MISMATCH: return "PointCloud in database does not match type";
357  case NO_POINTCLOUD: return "For at least one time no pointcloud found";
358  case QUERY_FAILED: return "MongoDB query failed";
359  default: return "Unknown error";
360  }
361 }
362 
363 #endif
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Cloud::Ptr CloudPtr
Shared pointer to cloud.
pcl::PointCloud< PointType > Cloud
Basic point cloud type.
mongo::DBClientBase * mongodb_client_
MongoDB client to retrieve data.
A class for handling time.
Definition: time.h:92
virtual ~PointCloudDBPipeline()
Destructor.
long cfg_pcl_age_tolerance_
Age tolerance for retrieved point clouds.
pcl::PointCloud< ColorPointType > ColorCloud
Type for colored point clouds based on ColorPointType.
void read_gridfs_file(void *dataptr, std::string &database, std::string filename)
Read a file from MongoDB GridFS.
pcl::PointXYZRGB ColorPointType
Colored point type.
Cloud::ConstPtr CloudConstPtr
Shared pointer to constant cloud.
Database point cloud pipeline base class.
fawkes::Logger * logger_
Logger for informative messages.
ColorCloud::Ptr ColorCloudPtr
Shared pointer to colored cloud.
Base class for exceptions in Fawkes.
Definition: exception.h:35
PointCloudDBPipeline(mongo::DBClientBase *mongodb_client, fawkes::Configuration *config, fawkes::Logger *logger, ColorCloudPtr output)
Constructor.
const char * name_
Name of the pipeline.
virtual std::vector< float > get_floats(const char *path)=0
Get list of values from configuration which is of type float.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
ColorCloudPtr output_
The final (colored) output of the pipeline.
ColorCloud::ConstPtr ColorCloudConstPtr
Shared pointer to constant colored cloud.
long cfg_transform_range_[2]
Transform range start and end times.
Interface for configuration handling.
Definition: config.h:64
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
ApplicabilityStatus applicable(std::vector< long long > &times, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
std::vector< CloudPtr > retrieve_clouds(std::vector< long long > &times, std::vector< long long > &actual_times, std::string &database, std::string &collection)
Retrieve point clouds from database.
Interface for logging.
Definition: logger.h:41