Fawkes API  Fawkes Development Version
mongodb_instance_config.cpp
1 
2 /***************************************************************************
3  * mongodb_instance_config.cpp - MongoDB instance configuration
4  *
5  * Created: Wed Jul 12 14:33:02 2017
6  * Copyright 2006-2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_instance_config.h"
23 
24 #include <config/config.h>
25 #include <core/exceptions/system.h>
26 #include <mongo/client/dbclient.h>
27 #include <utils/sub_process/proc.h>
28 #include <utils/time/wait.h>
29 
30 #include <boost/filesystem.hpp>
31 #include <chrono>
32 #include <numeric>
33 #include <wordexp.h>
34 
35 using namespace fawkes;
36 using namespace std::chrono_literals;
37 
38 /** @class MongoDBInstanceConfig "mongodb_client_config.h"
39  * MongoDB Instances Configuration.
40  * Configure a single mongod instance that can be run as a sub-process.
41  *
42  * @author Tim Niemueller
43  */
44 
45 /** Constructor.
46  * This will read the given configuration.
47  * @param config configuration to query
48  * @param cfgname configuration name
49  * @param prefix configuration path prefix
50  */
52  std::string cfgname,
53  std::string prefix)
54 : Thread("MongoDBInstance", Thread::OPMODE_CONTINUOUS)
55 {
56  set_name("MongoDBInstance|%s", cfgname.c_str());
57  config_name_ = cfgname;
58 
59  running_ = false;
60 
61  enabled_ = false;
62  try {
63  enabled_ = config->get_bool(prefix + "enabled");
64  } catch (Exception &e) {
65  }
66 
67  if (enabled_) {
68  startup_grace_period_ = 10;
69  try {
70  startup_grace_period_ = config->get_uint(prefix + "startup-grace-period");
71  } catch (Exception &e) {
72  } // ignored, use default
73  loop_interval_ = 5.0;
74  try {
75  loop_interval_ = config->get_float(prefix + "loop-interval");
76  } catch (Exception &e) {
77  } // ignored, use default
78  termination_grace_period_ = config->get_uint(prefix + "termination-grace-period");
79  clear_data_on_termination_ = config->get_bool(prefix + "clear-data-on-termination");
80  port_ = config->get_uint(prefix + "port");
81  data_path_ = config->get_string(prefix + "data-path");
82  log_path_ = config->get_string(prefix + "log/path");
83  log_append_ = config->get_bool(prefix + "log/append");
84  try {
85  replica_set_ = config->get_string(prefix + "replica-set");
86  ;
87  } catch (Exception &e) {
88  } // ignored, no replica set
89  if (!replica_set_.empty()) {
90  oplog_size_ = 0;
91  try {
92  oplog_size_ = config->get_uint(prefix + "oplog-size");
93  } catch (Exception &e) {
94  } // ignored, use default
95  }
96  }
97 
98  argv_ = {"mongod", "--port", std::to_string(port_), "--dbpath", data_path_};
99 
100  if (!log_path_.empty()) {
101  if (log_append_) {
102  argv_.push_back("--logappend");
103  }
104  argv_.push_back("--logpath");
105  argv_.push_back(log_path_);
106  }
107 
108  if (!replica_set_.empty()) {
109  argv_.push_back("--replSet");
110  argv_.push_back(replica_set_);
111  if (oplog_size_ > 0) {
112  argv_.push_back("--oplogSize");
113  argv_.push_back(std::to_string(oplog_size_));
114  }
115  }
116 
117  if (enabled_) {
118  std::string extra_args = config->get_string_or_default((prefix + "args").c_str(), "");
119  if (!extra_args.empty()) {
120  wordexp_t p;
121  int wrv = wordexp(extra_args.c_str(), &p, WRDE_NOCMD | WRDE_UNDEF);
122  switch (wrv) {
123  case 0: break; // all good
124  case WRDE_BADCHAR: throw Exception("%s: invalid character in args", name());
125  case WRDE_BADVAL: throw Exception("%s: undefined variable referenced in args", name());
126  case WRDE_CMDSUB:
127  throw Exception("%s: running sub-commands has been disabled for args", name());
128  case WRDE_NOSPACE: throw OutOfMemoryException("Cannot parse args");
129  case WRDE_SYNTAX: throw Exception("%s: shell syntax error in args", name());
130  default: throw Exception("Unexpected wordexp error %d when parsing args", wrv);
131  }
132 
133  // These arguments may not be passed, they are either configured through
134  // config values and could interfere, or they mess with our rs handling.
135  std::vector<std::string> invalid_args = {"--port",
136  "--dbpath",
137  "--fork",
138  "--logappend",
139  "--logpath",
140  "--replSet",
141  "--oplogSize",
142  "--master",
143  "--slave",
144  "--source",
145  "--only"};
146 
147  // pass and verify arguments to be added to command line
148  for (size_t i = 0; i < p.we_wordc; ++i) {
149  for (size_t j = 0; j < invalid_args.size(); ++j) {
150  if (invalid_args[j] == p.we_wordv[i]) {
151  wordfree(&p);
152  throw Exception("%s: %s may not be passed in args", name(), invalid_args[j].c_str());
153  }
154  }
155  argv_.push_back(p.we_wordv[i]);
156  }
157  wordfree(&p);
158  }
159  }
160 
161  command_line_ = std::accumulate(std::next(argv_.begin()),
162  argv_.end(),
163  argv_.front(),
164  [](std::string &s, const std::string &a) { return s + " " + a; });
165 }
166 
167 void
169 {
170  if (enabled_) {
171  logger->log_debug(name(), "enabled: true");
172  logger->log_debug(name(), "TCP port: %u", port_);
173  logger->log_debug(name(), "Termination grace period: %u", termination_grace_period_);
174  logger->log_debug(name(),
175  "clear data on termination: %s",
176  clear_data_on_termination_ ? "yes" : "no");
177  logger->log_debug(name(), "data path: %s", data_path_.c_str());
178  logger->log_debug(name(), "log path: %s", log_path_.c_str());
179  logger->log_debug(name(), "log append: %s", log_append_ ? "yes" : "no");
180  logger->log_debug(name(),
181  "replica set: %s",
182  replica_set_.empty() ? "DISABLED" : replica_set_.c_str());
183  if (!replica_set_.empty()) {
184  logger->log_debug(name(), "Op Log Size: %u MB", oplog_size_);
185  }
186 
187  start_mongod();
188  } else {
189  throw Exception("Instance '%s' cannot be started while disabled", name());
190  }
191 
192  timewait_ = new TimeWait(clock, (int)(loop_interval_ * 1000000.));
193 }
194 
195 void
197 {
198  timewait_->mark_start();
199  if (!running_ || !check_alive()) {
200  logger->log_error(name(), "MongoDB dead, restarting");
201  // on a crash, clean to make sure
202  try {
203  kill_mongod(true);
204  start_mongod();
205  } catch (Exception &e) {
206  logger->log_error(name(), "Failed to start MongoDB: %s", e.what_no_backtrace());
207  }
208  }
209  timewait_->wait_systime();
210 }
211 
212 void
214 {
215  kill_mongod(clear_data_on_termination_);
216  delete timewait_;
217 }
218 
219 /** Get command line used to execute program.
220  * @return command line to run mongod
221  */
222 std::string
224 {
225  return command_line_;
226 }
227 
228 /** Get termination grace period.
229  * @return termination grace period
230  */
231 unsigned int
233 {
234  return termination_grace_period_;
235 }
236 
237 bool
238 MongoDBInstanceConfig::check_alive()
239 {
240  try {
241  std::shared_ptr<mongo::DBClientConnection> client =
242  std::make_shared<mongo::DBClientConnection>();
243  std::string errmsg;
244  mongo::HostAndPort hostport("localhost", port_);
245  if (!client->connect(hostport, errmsg)) {
246  return false;
247  }
248  mongo::BSONObj cmd(BSON("isMaster" << 1));
249  mongo::BSONObj reply;
250  bool ok = client->runCommand("admin", cmd, reply);
251  if (!ok) {
252  logger->log_warn(name(), "Failed to connect: %s", reply.jsonString().c_str());
253  }
254  return ok;
255  } catch (mongo::DBException &e) {
256  logger->log_warn(name(), "Fail: %s", e.what());
257  return false;
258  }
259 }
260 
261 /** Start mongod. */
262 void
264 {
265  if (running_)
266  return;
267 
268  if (check_alive()) {
269  logger->log_warn(name(), "MongoDB already running, not starting");
270  running_ = true;
271  return;
272  }
273 
274  try {
275  boost::filesystem::create_directories(data_path_);
276  } catch (boost::filesystem::filesystem_error &e) {
277  throw Exception("Failed to create data path '%s' for mongod(%s): %s",
278  data_path_.c_str(),
279  config_name_.c_str(),
280  e.what());
281  }
282 
283  if (!log_path_.empty()) {
284  boost::filesystem::path p(log_path_);
285  try {
286  boost::filesystem::create_directories(p.parent_path());
287  } catch (boost::filesystem::filesystem_error &e) {
288  throw Exception("Failed to create log path '%s' for mongod(%s): %s",
289  p.parent_path().string().c_str(),
290  config_name_.c_str(),
291  e.what());
292  }
293  }
294 
295  std::string progname = "mongod(" + config_name_ + ")";
296  proc_ =
297  std::make_shared<SubProcess>(progname, "mongod", argv_, std::vector<std::string>{}, logger);
298 
299  for (unsigned i = 0; i < startup_grace_period_ * 4; ++i) {
300  if (check_alive()) {
301  running_ = true;
302  return;
303  }
304  std::this_thread::sleep_for(250ms);
305  }
306  if (!running_) {
307  proc_.reset();
308  throw Exception("%s: instance did not start in time", name());
309  }
310 }
311 
312 /** Stop mongod.
313  * This send a SIGINT and then wait for the configured grace period
314  * before sending the TERM signal.
315  * @param clear_data true to clear data, false otherwise
316  */
317 void
319 {
320  if (proc_) {
321  proc_->kill(SIGINT);
322  for (unsigned i = 0; i < termination_grace_period_; ++i) {
323  if (!proc_->alive())
324  break;
325  std::this_thread::sleep_for(1s);
326  }
327  // This will send the term signal
328  proc_.reset();
329  running_ = false;
330  if (clear_data) {
331  try {
332  boost::filesystem::remove_all(data_path_);
333  } catch (boost::filesystem::filesystem_error &e) {
334  throw Exception("Failed to create data path '%s' for mongod(%s): %s",
335  data_path_.c_str(),
336  config_name_.c_str(),
337  e.what());
338  }
339  }
340  }
341 }
MongoDBInstanceConfig(fawkes::Configuration *config, std::string cfgname, std::string prefix)
Constructor.
void start_mongod()
Start mongod.
virtual void finalize()
Finalize the thread.
Fawkes library namespace.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
unsigned int termination_grace_period() const
Get termination grace period.
Thread class encapsulation of pthreads.
Definition: thread.h:45
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
virtual void init()
Initialize the thread.
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
Base class for exceptions in Fawkes.
Definition: exception.h:35
void kill_mongod(bool clear_data)
Stop mongod.
const char * name() const
Get name of thread.
Definition: thread.h:100
virtual const char * what_no_backtrace() const
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
std::string command_line() const
Get command line used to execute program.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
virtual void loop()
Code to execute in the thread.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
Interface for configuration handling.
Definition: config.h:64
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Time wait utility.
Definition: wait.h:32
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:31
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.