23 #include "mongodb_log_pcl_thread.h" 26 #include <core/threading/mutex_locker.h> 27 #include <utils/time/wait.h> 30 #include <mongo/client/dbclient.h> 31 #include <mongo/client/gridfs.h> 37 using namespace mongo;
67 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/pointclouds/storage-interval");
69 cfg_chunk_size_ = 2097152;
71 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/chunk-size");
76 cfg_flush_after_write_ =
false;
78 cfg_flush_after_write_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/flush-after-write");
82 std::vector<std::string> includes;
87 std::vector<std::string> excludes;
95 gridfs_ =
new GridFS(*mongodb_, database_);
102 std::vector<std::string>::iterator p;
103 std::vector<std::string>::iterator f;
104 for (p = pcls.begin(); p != pcls.end(); ++p) {
105 bool include = includes.empty();
107 for (f = includes.begin(); f != includes.end(); ++f) {
108 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
116 for (f = excludes.begin(); f != excludes.end(); ++f) {
117 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
131 std::string topic_name = std::string(
"PointClouds.") + *p;
133 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
134 topic_name.replace(pos, 1,
"_");
138 pi.topic_name = topic_name;
142 std::string frame_id;
143 unsigned int width, height;
146 adapter_->
get_info(*p, width, height, frame_id, is_dense, fieldinfo);
147 pi.msg.header.frame_id = frame_id;
148 pi.msg.width = width;
149 pi.msg.height = height;
150 pi.msg.is_dense = is_dense;
151 pi.msg.fields.clear();
152 pi.msg.fields.resize(fieldinfo.size());
153 for (
unsigned int i = 0; i < fieldinfo.size(); ++i) {
154 pi.msg.fields[i].name = fieldinfo[i].name;
155 pi.msg.fields[i].offset = fieldinfo[i].offset;
156 pi.msg.fields[i].datatype = fieldinfo[i].datatype;
157 pi.msg.fields[i].count = fieldinfo[i].count;
163 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
164 mutex_ =
new Mutex();
189 std::map<std::string, PointCloudInfo>::iterator p;
190 unsigned int num_stored = 0;
191 for (p = pcls_.begin(); p != pcls_.end(); ++p) {
192 PointCloudInfo &pi = p->second;
193 std::string frame_id;
194 unsigned int width, height;
196 size_t point_size, num_points;
199 p->first, frame_id, width, height, time, &point_data, point_size, num_points);
200 size_t data_size = point_size * num_points;
202 if (pi.last_sent != time) {
207 BSONObjBuilder document;
208 document.append(
"timestamp", (
long long)time.
in_msec());
209 BSONObjBuilder subb(document.subobjStart(
"pointcloud"));
210 subb.append(
"frame_id", pi.msg.header.frame_id);
211 subb.append(
"is_dense", pi.msg.is_dense);
212 subb.append(
"width", width);
213 subb.append(
"height", height);
214 subb.append(
"point_size", (
unsigned int)point_size);
215 subb.append(
"num_points", (
unsigned int)num_points);
217 std::stringstream
name;
219 subb.append(
"data", gridfs_->storeFile((
char *)point_data, data_size,
name.str()));
221 BSONArrayBuilder subb2(subb.subarrayStart(
"field_info"));
222 for (
unsigned int i = 0; i < pi.msg.fields.size(); i++) {
223 BSONObjBuilder fi(subb2.subobjStart());
224 fi.append(
"name", pi.msg.fields[i].name);
225 fi.append(
"offset", pi.msg.fields[i].offset);
226 fi.append(
"datatype", pi.msg.fields[i].datatype);
227 fi.append(
"count", pi.msg.fields[i].count);
232 collection_ = database_ +
"." + pi.topic_name;
234 mongodb_->insert(collection_, document.obj());
236 }
catch (mongo::DBException &e) {
238 "Failed to insert into %s: %s",
244 float diff = (end - &
start) * 1000.;
246 "Stored point cloud %s (time %li) in %.1f ms",
260 "Stored %u of %zu point clouds in %.1f ms",
263 (loop_end - &loop_start) * 1000.);
265 if (cfg_flush_after_write_) {
267 BSONObjBuilder flush_cmd;
269 flush_cmd.append(
"fsync", 1);
270 flush_cmd.append(
"async", 1);
272 if (reply.hasField(
"ok")) {
273 if (!reply[
"ok"].trueValue()) {
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
void wait()
Wait until minimum loop time has been reached.
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
MongoLogPointCloudThread()
Constructor.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Fawkes library namespace.
void unlock()
Unlock the mutex.
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
A class for handling time.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Logger * logger
This is the Logger member used to access the logger.
Clock * clock
By means of this member access to the clock is given.
long in_msec() const
Convert the stored time into milli-seconds.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Base class for exceptions in Fawkes.
Thread aspect to access MongoDB.
const char * name() const
Get name of thread.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
void mark_start()
Mark start of loop.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
Point cloud adapter class.
virtual ~MongoLogPointCloudThread()
Destructor.
void lock()
Lock this mutex.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
Configuration * config
This is the Configuration member used to access the configuration.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual void loop()
Code to execute in the thread.
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
void start(bool wait=true)
Call this method to start the thread.
virtual void init()
Initialize the thread.