Fawkes API  Fawkes Development Version
mongodb_log_image_thread.cpp
1 
2 /***************************************************************************
3  * mongodb_log_image_thread.cpp - Thread to log images to MongoDB
4  *
5  * Created: Tue Apr 10 22:12:38 2012
6  * Copyright 2011-2017 Tim Niemueller [www.niemueller.de]
7  * 2012 Bastian Klingen
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "mongodb_log_image_thread.h"
24 
25 #include <core/threading/mutex_locker.h>
26 #include <fvutils/color/colorspaces.h>
27 #include <fvutils/ipc/shm_image.h>
28 #include <utils/time/wait.h>
29 
30 // from MongoDB
31 #include <mongo/client/dbclient.h>
32 #include <mongo/client/gridfs.h>
33 
34 #include <fnmatch.h>
35 
36 using namespace fawkes;
37 using namespace firevision;
38 using namespace mongo;
39 
40 /** @class MongoLogImagesThread "mongodb_log_image_thread.h"
41  * Thread to export Fawkes images to MongoDB.
42  * @author Tim Niemueller
43  * @author Bastian Klingen
44  */
45 
46 /** Constructor. */
48 : Thread("MongoLogImagesThread", Thread::OPMODE_CONTINUOUS), MongoDBAspect("default")
49 {
51 }
52 
53 /** Destructor. */
55 {
56 }
57 
58 void
60 {
61  database_ = "fflog";
62  try {
63  database_ = config->get_string("/plugins/mongodb-log/database");
64  } catch (Exception &e) {
65  logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
66  }
67 
68  cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/images/storage-interval");
69 
70  cfg_chunk_size_ = 2097152; // 2 MB
71  try {
72  cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/images/chunk-size");
73  } catch (Exception &e) {
74  } // ignored, use default
75  logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
76 
77  try {
78  includes_ = config->get_strings("/plugins/mongodb-log/images/includes");
79  } catch (Exception &e) {
80  } // ignored, no include rules
81  try {
82  excludes_ = config->get_strings("/plugins/mongodb-log/images/excludes");
83  } catch (Exception &e) {
84  } // ignored, no include rules
85 
86  mongodb_ = mongodb_client;
87  gridfs_ = new GridFS(*mongodb_, database_);
88 
89  last_update_ = new Time(clock);
90  now_ = new Time(clock);
91  wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
92  mutex_ = new Mutex();
93  update_images();
94 }
95 
96 bool
98 {
99  mutex_->lock();
100  return true;
101 }
102 
103 void
105 {
106  logger->log_debug(name(), "Finalizing MongoLogImagesThread");
107  std::map<std::string, ImageInfo>::iterator p;
108  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
109  delete p->second.img;
110  }
111  imgs_.clear();
112  delete gridfs_;
113  delete wait_;
114  delete mutex_;
115  delete now_;
116  delete last_update_;
117 }
118 
119 void
121 {
122  MutexLocker lock(mutex_);
123  fawkes::Time loop_start(clock);
124  wait_->mark_start();
125  unsigned int num_stored = 0;
126 
127  now_->stamp();
128  if (*now_ - last_update_ >= 5.0) {
129  *last_update_ = now_;
130  update_images();
131  }
132 
133  std::map<std::string, ImageInfo>::iterator p;
134  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
135  ImageInfo &imginfo = p->second;
136 
137  fawkes::Time cap_time = imginfo.img->capture_time();
138 
139  if ((imginfo.last_sent != cap_time)) {
140  BSONObjBuilder document;
141  imginfo.last_sent = cap_time;
142  document.append("timestamp", (long long)cap_time.in_msec());
143 
144  BSONObjBuilder subb(document.subobjStart("image"));
145  subb.append("image_id", imginfo.img->image_id());
146  subb.append("width", imginfo.img->width());
147  subb.append("height", imginfo.img->height());
148  subb.append("colorspace", colorspace_to_string(imginfo.img->colorspace()));
149 
150  std::stringstream name;
151  name << imginfo.topic_name << "_" << cap_time.in_msec();
152  subb.append("data",
153  gridfs_->storeFile((char *)imginfo.img->buffer(),
154  imginfo.img->data_size(),
155  name.str()));
156 
157  subb.doneFast();
158  collection_ = database_ + "." + imginfo.topic_name;
159  try {
160  mongodb_->insert(collection_, document.obj());
161  ++num_stored;
162  } catch (mongo::DBException &e) {
163  logger->log_warn(this->name(),
164  "Failed to insert image %s into %s: %s",
165  imginfo.img->image_id(),
166  collection_.c_str(),
167  e.what());
168  }
169  }
170  }
171 
172  mutex_->unlock();
173  fawkes::Time loop_end(clock);
174  logger->log_debug(name(),
175  "Stored %u of %zu images in %.1f ms",
176  num_stored,
177  imgs_.size(),
178  (loop_end - &loop_start) * 1000.);
179  wait_->wait();
180 }
181 
182 void
183 MongoLogImagesThread::update_images()
184 {
185  std::set<std::string> missing_images;
186  std::set<std::string> unbacked_images;
187  get_sets(missing_images, unbacked_images);
188 
189  if (!unbacked_images.empty()) {
190  std::set<std::string>::iterator i;
191  for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
192  logger->log_info(name(),
193  "Shutting down MongoLog for no longer available image %s",
194  i->c_str());
195  ImageInfo &imginfo = imgs_[*i];
196  delete imginfo.img;
197  imgs_.erase(*i);
198  }
199  }
200 
201  if (!missing_images.empty()) {
202  std::set<std::string>::iterator i;
203  for (i = missing_images.begin(); i != missing_images.end(); ++i) {
204  std::vector<std::string>::iterator f;
205  bool include = includes_.empty();
206  if (!include) {
207  for (f = includes_.begin(); f != includes_.end(); ++f) {
208  if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
209  include = true;
210  break;
211  }
212  }
213  }
214  if (include) {
215  for (f = excludes_.begin(); f != excludes_.end(); ++f) {
216  if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
217  include = false;
218  break;
219  }
220  }
221  }
222  if (!include) {
223  //logger->log_info(name(), "Excluding image %s", i->c_str());
224  continue;
225  }
226 
227  logger->log_info(name(), "Starting to log image %s", i->c_str());
228 
229  std::string topic_name = std::string("Images.") + *i;
230  size_t pos = 0;
231  while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
232  topic_name.replace(pos, 1, "_");
233  pos = pos + 1;
234  }
235 
236  ImageInfo imginfo;
237  imginfo.topic_name = topic_name;
238  imginfo.img = new SharedMemoryImageBuffer(i->c_str());
239  imgs_[*i] = imginfo;
240  }
241  }
242 }
243 
244 void
245 MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
246  std::set<std::string> &unbacked_images)
247 {
248  std::set<std::string> published_images;
249  std::map<std::string, ImageInfo>::iterator p;
250  for (p = imgs_.begin(); p != imgs_.end(); ++p) {
251  if (p->second.img->num_attached() > 1) {
252  published_images.insert(p->first);
253  }
254  }
255 
256  std::set<std::string> image_buffers;
258  SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
259  SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
260 
261  while (i != endi) {
263  dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
264  if (ih) {
265  image_buffers.insert(ih->image_id());
266  }
267  ++i;
268  }
269  delete h;
270 
271  missing_images.clear();
272  unbacked_images.clear();
273 
274  std::set_difference(image_buffers.begin(),
275  image_buffers.end(),
276  published_images.begin(),
277  published_images.end(),
278  std::inserter(missing_images, missing_images.end()));
279 
280  std::set_difference(published_images.begin(),
281  published_images.end(),
282  image_buffers.begin(),
283  image_buffers.end(),
284  std::inserter(unbacked_images, unbacked_images.end()));
285 }
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:78
Shared memory image buffer header.
Definition: shm_image.h:66
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
std::optional< int64_t > height() const
Get height value.
Definition: ImageInfo.h:194
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
std::optional< int64_t > width() const
Get width value.
Definition: ImageInfo.h:177
Mutex locking helper.
Definition: mutex_locker.h:33
Shared Memory iterator.
Definition: shm.h:118
mongo::DBClientBase * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:55
virtual ~MongoLogImagesThread()
Destructor.
A class for handling time.
Definition: time.h:92
Thread class encapsulation of pthreads.
Definition: thread.h:45
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
virtual void finalize()
Finalize the thread.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:228
virtual void init()
Initialize the thread.
Base class for exceptions in Fawkes.
Definition: exception.h:35
const char * image_id() const
Get image number.
Definition: shm_image.cpp:838
Thread aspect to access MongoDB.
Definition: mongodb.h:39
virtual void loop()
Code to execute in the thread.
Shared memory image buffer.
Definition: shm_image.h:183
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
std::optional< std::string > colorspace() const
Get colorspace value.
Definition: ImageInfo.h:143
void lock()
Lock this mutex.
Definition: mutex.cpp:87
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Mutex mutual exclusion lock.
Definition: mutex.h:32
ImageInfo representation for JSON transfer.
Definition: ImageInfo.h:26
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.