22 #include "robot_memory.h" 24 #include <core/threading/mutex.h> 25 #include <core/threading/mutex_locker.h> 26 #include <interfaces/RobotMemoryInterface.h> 27 #include <utils/misc/string_conversions.h> 28 #include <utils/misc/string_split.h> 29 #include <utils/system/hostinfo.h> 36 #include <mongo/client/dbclient.h> 38 using namespace mongo;
69 mongo_connection_manager_ = mongo_connection_manager;
70 blackboard_ = blackboard;
71 mongodb_client_local_ =
nullptr;
72 mongodb_client_distributed_ =
nullptr;
76 RobotMemory::~RobotMemory()
78 mongo_connection_manager_->delete_client(mongodb_client_local_);
79 mongo_connection_manager_->delete_client(mongodb_client_distributed_);
80 delete trigger_manager_;
81 blackboard_->close(rm_if_);
88 log(
"Started RobotMemory");
89 default_collection_ =
"robmem.test";
91 default_collection_ = config_->get_string(
"/plugins/robot-memory/default-collection");
95 debug_ = config_->get_bool(
"/plugins/robot-memory/more-debug-output");
98 database_name_ =
"robmem";
100 database_name_ = config_->get_string(
"/plugins/robot-memory/database");
103 distributed_dbs_ = config_->get_strings(
"/plugins/robot-memory/distributed-db-names");
104 cfg_startup_grace_period_ = 10;
106 cfg_startup_grace_period_ = config_->get_uint(
"/plugins/robot-memory/startup-grace-period");
110 cfg_coord_database_ = config_->get_string(
"/plugins/robot-memory/coordination/database");
111 cfg_coord_mutex_collection_ =
112 config_->get_string(
"/plugins/robot-memory/coordination/mutex-collection");
113 cfg_coord_mutex_collection_ = cfg_coord_database_ +
"." + cfg_coord_mutex_collection_;
115 using namespace std::chrono_literals;
118 log(
"Connect to local mongod");
119 unsigned int startup_tries = 0;
120 for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
122 mongodb_client_local_ = mongo_connection_manager_->create_client(
"robot-memory-local");
125 logger_->log_info(name_,
"Waiting for local");
126 std::this_thread::sleep_for(500ms);
130 if (config_->exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
131 && config_->get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
133 log(
"Connect to distributed mongod");
134 for (startup_tries = 0; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
136 mongodb_client_distributed_ =
137 mongo_connection_manager_->create_client(
"robot-memory-distributed");
140 logger_->log_info(name_,
"Waiting for distributed");
141 std::this_thread::sleep_for(500ms);
147 rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
148 config_->get_string(
"/plugins/robot-memory/interface-name").c_str());
149 rm_if_->set_error(
"");
150 rm_if_->set_result(
"");
157 log_deb(
"Initialized RobotMemory");
163 trigger_manager_->check_events();
164 computables_manager_->cleanup_computed_docs();
176 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
177 log_deb(std::string(
"Executing Query " +
query.toString() +
" on collection " + collection));
180 computables_manager_->check_and_compute(
query, collection);
186 query.readPref(ReadPreference_Nearest, BSONArray());
191 cursor = mongodb_client->query(collection,
query);
193 logger_->log_error(name_,
"Connection failed %s", collection.c_str());
195 }
catch (DBException &e) {
197 std::string(
"Error for query ") +
query.toString() +
"\n Exception: " + e.toString();
213 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
214 log_deb(std::string(
"Executing Aggregation on collection " + collection));
226 size_t point_pos = collection.find(
".");
227 if (point_pos == std::string::npos) {
228 logger_->log_error(name_,
"Collection %s needs to start with 'dbname.'", collection.c_str());
229 return fromjson(
"{}");
231 std::string db = collection.substr(0, point_pos);
232 std::string col = collection.substr(point_pos + 1);
234 mongodb_client->runCommand(db, BSON(
"aggregate" << col <<
"pipeline" << pipeline), res);
235 }
catch (DBException &e) {
236 std::string error = std::string(
"Error for aggregation ") +
"\n Exception: " + e.toString();
238 return fromjson(
"{}");
252 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
254 log_deb(std::string(
"Inserting " + obj.toString() +
" into collection " + collection));
261 mongodb_client->insert(collection, obj);
262 }
catch (DBException &e) {
263 std::string error =
"Error for insert " + obj.toString() +
"\n Exception: " + e.toString();
264 log_deb(error,
"error");
280 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
282 log_deb(std::string(
"Creating index " + obj.toString() +
" on collection " + collection));
289 mongodb_client->createIndex(collection, mongo::IndexSpec().addKeys(obj).unique(unique));
290 }
catch (DBException &e) {
292 "Error when creating index " + obj.toString() +
"\n Exception: " + e.toString();
293 log_deb(error,
"error");
309 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
311 std::string insert_string =
"[";
312 for (BSONObj obj : v_obj) {
313 insert_string += obj.toString() +
",\n";
315 insert_string +=
"]";
317 log_deb(std::string(
"Inserting vector of documents " + insert_string +
" into collection " 325 mongodb_client->insert(collection, v_obj);
326 }
catch (DBException &e) {
327 std::string error =
"Error for insert " + insert_string +
"\n Exception: " + e.toString();
328 log_deb(error,
"error");
344 return insert(fromjson(obj_str), collection);
358 const std::string &collection,
361 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
362 log_deb(std::string(
"Executing Update " +
update.toString() +
" for query " +
query.toString()
363 +
" on collection " + collection));
370 mongodb_client->update(collection,
query,
update, upsert);
371 }
catch (DBException &e) {
372 log_deb(std::string(
"Error for update " +
update.toString() +
" for query " +
query.toString()
373 +
"\n Exception: " + e.toString()),
391 const std::string &update_str,
392 const std::string &collection,
395 return update(
query, fromjson(update_str), collection, upsert);
409 const mongo::BSONObj &
update,
410 const std::string & collection,
414 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
416 log_deb(std::string(
"Executing findOneAndUpdate " +
update.toString() +
" for filter " 417 + filter.toString() +
" on collection " + collection));
422 return mongodb_client->findAndModify(collection, filter,
update, upsert, return_new);
423 }
catch (DBException &e) {
424 std::string error =
"Error for update " +
update.toString() +
" for query " + filter.toString()
425 +
"\n Exception: " + e.toString();
426 log_deb(error,
"error");
428 b.append(
"error", error);
442 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
443 log_deb(std::string(
"Executing Remove " +
query.toString() +
" on collection " + collection));
450 mongodb_client->remove(collection,
query);
451 }
catch (DBException &e) {
452 log_deb(std::string(
"Error for query " +
query.toString() +
"\n Exception: " + e.toString()),
470 const std::string &collection,
471 const std::string &js_map_fun,
472 const std::string &js_reduce_fun)
474 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
476 log_deb(std::string(
"Executing MapReduce " +
query.toString() +
" on collection " + collection
477 +
" map: " + js_map_fun +
" reduce: " + js_reduce_fun));
478 return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun,
query);
490 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
492 log_deb(std::string(
"Executing Aggregation pipeline: " + pipeline.toString() +
" on collection " 497 cursor = mongodb_client->aggregate(collection, pipeline);
498 }
catch (DBException &e) {
500 std::string(
"Error for query ") + pipeline.toString() +
"\n Exception: " + e.toString();
515 mongo::DBClientBase *mongodb_client = get_mongodb_client(collection);
517 log_deb(
"Dropping collection " + collection);
518 return mongodb_client->dropCollection(collection);
531 log_deb(
"Clearing whole robot memory");
532 mongodb_client_local_->dropDatabase(database_name_);
545 std::string coll{std::move(collection)};
552 if (coll.find(
".") == std::string::npos) {
553 log(std::string(
"Unable to restore collection" + coll),
"error");
554 log(std::string(
"Specify collection like 'db.collection'"),
"error");
557 std::string path = StringConversions::resolve_path(directory) +
"/" 558 + coll.replace(coll.find(
"."), 1,
"/") +
".bson";
559 log_deb(std::string(
"Restore collection " + collection +
" from " + path),
"warn");
562 std::string command =
"/usr/bin/mongorestore --dir " + path +
" --host=127.0.0.1 --quiet";
563 log_deb(std::string(
"Restore command: " + command),
"warn");
564 FILE *bash_output = popen(command.c_str(),
"r");
568 log(std::string(
"Unable to restore collection" + coll),
"error");
571 std::string output_string =
"";
573 while (!feof(bash_output)) {
574 if (fgets(buffer, 100, bash_output) == NULL) {
577 output_string += buffer;
580 if (output_string.find(
"Failed") != std::string::npos) {
581 log(std::string(
"Unable to restore collection" + coll),
"error");
582 log_deb(output_string,
"error");
601 if (collection.find(
".") == std::string::npos) {
602 log(std::string(
"Unable to dump collection" + collection),
"error");
603 log(std::string(
"Specify collection like 'db.collection'"),
"error");
606 std::string path = StringConversions::resolve_path(directory);
607 log_deb(std::string(
"Dump collection " + collection +
" into " + path),
"warn");
610 std::vector<std::string> split = str_split(collection,
'.');
611 std::string command =
"/usr/bin/mongodump --out=" + path +
" --db=" + split[0]
612 +
" --collection=" + split[1] +
" --host=127.0.0.1 --quiet";
613 log_deb(std::string(
"Dump command: " + command),
"warn");
614 FILE *bash_output = popen(command.c_str(),
"r");
617 log(std::string(
"Unable to dump collection" + collection),
"error");
620 std::string output_string =
"";
622 while (!feof(bash_output)) {
623 if (fgets(buffer, 100, bash_output) == NULL) {
626 output_string += buffer;
629 if (output_string.find(
"Failed") != std::string::npos) {
630 log(std::string(
"Unable to dump collection" + collection),
"error");
631 log_deb(output_string,
"error");
638 RobotMemory::log(
const std::string &what,
const std::string &info)
640 if (!info.compare(
"error"))
641 logger_->log_error(name_,
"%s", what.c_str());
642 else if (!info.compare(
"warn"))
643 logger_->log_warn(name_,
"%s", what.c_str());
644 else if (!info.compare(
"debug"))
645 logger_->log_debug(name_,
"%s", what.c_str());
647 logger_->log_info(name_,
"%s", what.c_str());
651 RobotMemory::log_deb(
const std::string &what,
const std::string &level)
659 RobotMemory::log_deb(
const mongo::Query &
query,
const std::string &what,
const std::string &level)
662 log(
query, what, level);
667 RobotMemory::log(
const mongo::Query &
query,
const std::string &what,
const std::string &level)
670 what +
"\nFilter: " +
query.getFilter().toString()
671 +
"\nModifiers: " +
query.getModifiers().toString() +
"\nSort: " +
query.getSort().toString()
672 +
"\nHint: " +
query.getHint().toString() +
"\nReadPref: " +
query.getReadPref().toString();
677 RobotMemory::log_deb(
const mongo::BSONObj &obj,
const std::string &what,
const std::string &level)
679 log(obj, what, level);
683 RobotMemory::log(
const mongo::BSONObj &obj,
const std::string &what,
const std::string &level)
685 std::string output = what +
"\nObject: " + obj.toString();
690 RobotMemory::set_fields(mongo::BSONObj &obj,
const std::string &what)
693 b.appendElements(obj);
694 b.appendElements(fromjson(what));
700 RobotMemory::set_fields(mongo::Query &q,
const std::string &what)
703 b.appendElements(q.getFilter());
704 b.appendElements(fromjson(what));
718 RobotMemory::remove_field(mongo::Query &q,
const std::string &what)
721 b.appendElements(q.getFilter().removeField(what));
737 mongo::DBClientBase *
738 RobotMemory::get_mongodb_client(
const std::string &collection)
741 return mongodb_client_local_;
744 size_t point_pos = collection.find(
".");
745 if (point_pos == collection.npos) {
746 logger_->log_error(name_,
"Collection %s needs to start with 'dbname.'", collection.c_str());
747 return mongodb_client_local_;
749 std::string db = collection.substr(0, point_pos);
750 if (std::find(distributed_dbs_.begin(), distributed_dbs_.end(), db) != distributed_dbs_.end()) {
751 return mongodb_client_distributed_;
753 return mongodb_client_local_;
763 trigger_manager_->remove_trigger(trigger);
773 computables_manager_->remove_computable(computable);
787 mongo::DBClientInterface *client =
788 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
789 mongo::BSONObjBuilder insert_doc;
790 insert_doc.append(
"$currentDate", BSON(
"lock-time" <<
true));
791 insert_doc.append(
"_id", name);
792 insert_doc.append(
"locked",
false);
795 client->insert(cfg_coord_mutex_collection_,
798 &mongo::WriteConcern::majority);
800 }
catch (mongo::DBException &e) {
801 logger_->log_info(name_,
"Failed to create mutex %s: %s", name.c_str(), e.what());
815 mongo::DBClientInterface *client =
816 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
817 mongo::BSONObj destroy_doc{BSON(
"_id" << name)};
820 client->remove(cfg_coord_mutex_collection_, destroy_doc,
true, &mongo::WriteConcern::majority);
822 }
catch (mongo::DBException &e) {
823 logger_->log_info(name_,
"Failed to destroy mutex %s: %s", name.c_str(), e.what());
842 mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
844 std::string locked_by{identity};
845 if (identity.empty()) {
847 locked_by = host_info.
name();
851 mongo::BSONObjBuilder filter_doc;
852 filter_doc.append(
"_id", name);
854 filter_doc.append(
"locked",
false);
857 mongo::BSONObjBuilder update_doc;
858 update_doc.append(
"$currentDate", BSON(
"lock-time" <<
true));
859 mongo::BSONObjBuilder update_set;
860 update_set.append(
"locked",
true);
861 update_set.append(
"locked-by", locked_by);
862 update_doc.append(
"$set", update_set.obj());
866 BSONObj new_doc = client->findAndModify(cfg_coord_mutex_collection_,
873 &mongo::WriteConcern::majority);
875 return (new_doc.getField(
"locked-by").String() == locked_by
876 && new_doc.getField(
"locked").Bool());
878 }
catch (mongo::OperationException &e) {
879 logger_->log_error(name_,
"Mongo OperationException: %s", e.what());
881 mongo::BSONObjBuilder check_doc;
882 check_doc.append(
"_id", name);
883 check_doc.append(
"locked",
true);
884 check_doc.append(
"locked-by", locked_by);
886 BSONObj res_doc = client->findOne(cfg_coord_mutex_collection_, check_doc.obj());
887 logger_->log_info(name_,
"Checking whether mutex was acquired succeeded");
888 if (!res_doc.isEmpty()) {
889 logger_->log_warn(name_,
890 "Exception during try-lock for %s, " 891 "but mutex was still acquired",
894 logger_->log_info(name_,
895 "Exception during try-lock for %s, " 896 "and mutex was not acquired",
899 return !res_doc.isEmpty();
900 }
catch (mongo::OperationException &e) {
901 logger_->log_error(name_,
902 "Mongo OperationException while handling " 903 "the first exception: %s",
934 mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
936 std::string locked_by{identity};
937 if (identity.empty()) {
939 locked_by = host_info.
name();
943 mongo::BSONObj filter_doc{BSON(
"_id" << name <<
"locked-by" << locked_by)};
945 mongo::BSONObj update_doc{BSON(
"$set" << BSON(
"locked" <<
false) <<
"$unset" 946 << BSON(
"locked-by" <<
true <<
"lock-time" <<
true))};
950 BSONObj new_doc = client->findAndModify(cfg_coord_mutex_collection_,
957 &mongo::WriteConcern::majority);
960 }
catch (mongo::OperationException &e) {
977 mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
979 std::string locked_by{identity};
980 if (identity.empty()) {
982 locked_by = host_info.
name();
986 mongo::BSONObj filter_doc{BSON(
"_id" << name <<
"locked" <<
true <<
"locked-by" << locked_by)};
990 mongo::BSONObjBuilder update_doc;
991 update_doc.append(
"$currentDate", BSON(
"lock-time" <<
true));
992 mongo::BSONObjBuilder update_set;
993 update_set.append(
"locked",
true);
994 update_set.append(
"locked-by", locked_by);
995 update_doc.append(
"$set", update_set.obj());
999 BSONObj new_doc = client->findAndModify(cfg_coord_mutex_collection_,
1006 &mongo::WriteConcern::majority);
1009 }
catch (mongo::OperationException &e) {
1010 logger_->log_warn(name_,
"Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1031 mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1033 BSONObj keys = BSON(
"lock-time" <<
true);
1036 client->createIndex(cfg_coord_mutex_collection_,
1037 mongo::IndexSpec().addKeys(keys).expireAfterSeconds(max_age_sec));
1038 }
catch (DBException &e) {
1039 logger_->log_warn(name_,
"Creating TTL index failed: %s", e.what());
1054 mongo::DBClientBase *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1056 using std::chrono::high_resolution_clock;
1057 using std::chrono::milliseconds;
1058 using std::chrono::time_point;
1059 using std::chrono::time_point_cast;
1061 auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1062 time_point<high_resolution_clock, milliseconds> expire_before =
1063 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1064 mongo::Date_t expire_before_mdb(expire_before.time_since_epoch().count());
1067 mongo::BSONObj filter_doc{
1068 BSON(
"locked" <<
true <<
"lock-time" << mongo::LT << expire_before_mdb)};
1070 mongo::BSONObjBuilder update_doc;
1071 mongo::BSONObjBuilder update_set;
1072 update_set.append(
"locked",
false);
1073 update_set.append(
"locked-by",
"");
1074 update_doc.append(
"$set", update_set.obj());
1078 client->update(cfg_coord_mutex_collection_,
1083 &mongo::WriteConcern::majority);
1086 }
catch (mongo::OperationException &e) {
QResCursor query(mongo::Query query, const std::string &collection="")
Query information from the robot memory.
Manager to realize triggers on events in the robot memory.
This class manages registering computables and can check if any computables are invoced by a query.
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
int create_index(mongo::BSONObj keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
int remove(mongo::Query query, const std::string &collection="")
Remove documents from the robot memory.
Fawkes library namespace.
int update(mongo::Query query, mongo::BSONObj update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
mongo::BSONObj aggregate(const std::vector< mongo::BSONObj > &pipeline, const std::string &collection="")
Aggregation call on the robot memory.
This is supposed to be the central clock in Fawkes.
Class holding information for a single computable this class also enhances computed documents by addi...
mongo::BSONObj find_one_and_update(const mongo::BSONObj &filter, const mongo::BSONObj &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
void remove_computable(Computable *computable)
Remove previously registered computable.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
int restore_collection(const std::string &collection, const std::string &directory="@CONFDIR@/robot-memory")
Restore a previously dumped collection from a directory.
Class holding all information about an EventTrigger.
mongo::BSONObj mapreduce(mongo::Query query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
int insert(mongo::BSONObj obj, const std::string &collection="")
Inserts a document into the robot memory.
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
Base class for exceptions in Fawkes.
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
const char * name()
Get full hostname.
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
Interface for a MongoDB connection creator.
bool mutex_create(const std::string &name)
Explicitly create a mutex.
bool mutex_destroy(const std::string &name)
Destroy a mutex.
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
The BlackBoard abstract class.
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
Interface for configuration handling.
int dump_collection(const std::string &collection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.