45 #include <D4ParserSax2.h>
46 #include <XMLWriter.h>
47 #include <BaseTypeFactory.h>
48 #include <D4BaseTypeFactory.h>
50 #include "PicoSHA2/picosha2.h"
53 #include "TheBESKeys.h"
56 #include "BESContextManager.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
65 #include "GlobalMetadataStore.h"
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
71 #define AT_EXIT(x) atexit((x))
85 #undef SYMETRIC_ADD_RESPONSES
91 static const unsigned int default_cache_size = 20;
92 static const string default_cache_prefix =
"mds";
93 static const string default_cache_dir =
"";
94 static const string default_ledger_name =
"mds_ledger.txt";
96 static const string PATH_KEY =
"DAP.GlobalMetadataStore.path";
97 static const string PREFIX_KEY =
"DAP.GlobalMetadataStore.prefix";
98 static const string SIZE_KEY =
"DAP.GlobalMetadataStore.size";
99 static const string LEDGER_KEY =
"DAP.GlobalMetadataStore.ledger";
100 static const string LOCAL_TIME_KEY =
"BES.LogTimeLocal";
103 bool GlobalMetadataStore::d_enabled =
true;
119 void GlobalMetadataStore::transfer_bytes(
int fd, ostream &os)
121 static const int BUFFER_SIZE = 16*1024;
123 #if _POSIX_C_SOURCE >= 200112L
125 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
127 ERROR(
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
130 char buf[BUFFER_SIZE + 1];
132 while(
int bytes_read = read(fd, buf, BUFFER_SIZE))
135 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
139 os.write(buf, bytes_read);
155 void GlobalMetadataStore::insert_xml_base(
int fd, ostream &os,
const string &xml_base)
157 static const int BUFFER_SIZE = 1024;
159 #if _POSIX_C_SOURCE >= 200112L
161 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
163 ERROR(
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
166 char buf[BUFFER_SIZE + 1];
167 size_t bytes_read = read(fd, buf, BUFFER_SIZE);
169 if(bytes_read == (
size_t)-1)
170 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
187 while (buf[i++] !=
'>')
194 char xml_base_literal[] =
"xml:base";
195 while (i < bytes_read) {
197 os.write(buf + s, i - s);
198 os <<
" xml:base=\"" << xml_base <<
"\"";
201 else if (j ==
sizeof(xml_base_literal) - 1) {
202 os.write(buf + s, i - s);
203 while (buf[i++] !=
'=')
205 while (buf[i++] !=
'"')
207 while (buf[i++] !=
'"')
209 os <<
"=\"" << xml_base <<
"\"";
212 else if (buf[i] == xml_base_literal[j]) {
223 os.write(buf + i, bytes_read - i);
226 transfer_bytes(fd, os);
229 unsigned long GlobalMetadataStore::get_cache_size_from_config()
233 unsigned long size_in_megabytes = default_cache_size;
237 "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY <<
"=" << size << endl);
238 istringstream iss(size);
239 iss >> size_in_megabytes;
242 return size_in_megabytes;
245 string GlobalMetadataStore::get_cache_prefix_from_config()
248 string prefix = default_cache_prefix;
252 "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY <<
"=" << prefix << endl);
260 string GlobalMetadataStore::get_cache_dir_from_config()
264 string cacheDir = default_cache_dir;
268 "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<<
"=" << cacheDir << endl);
306 GlobalMetadataStore::get_instance(
const string &cache_dir,
const string &prefix,
unsigned long long size)
308 if (d_enabled && d_instance == 0) {
310 d_enabled = d_instance->cache_enabled();
315 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
318 AT_EXIT(delete_instance);
320 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
324 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
336 GlobalMetadataStore::get_instance()
338 if (d_enabled && d_instance == 0) {
339 d_instance =
new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
340 get_cache_size_from_config());
341 d_enabled = d_instance->cache_enabled();
345 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
348 AT_EXIT(delete_instance);
350 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
354 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance() - d_instance: " << (
void *) d_instance << endl);
364 GlobalMetadataStore::initialize()
370 BESDEBUG(DEBUG_KEY,
"Located BES key " << LEDGER_KEY <<
"=" << d_ledger_name << endl);
373 d_ledger_name = default_ledger_name;
376 ofstream of(d_ledger_name.c_str(), ios::app);
379 string local_time =
"no";
381 d_use_local_time = (local_time ==
"YES" || local_time ==
"Yes" || local_time ==
"yes");
398 GlobalMetadataStore::GlobalMetadataStore()
400 :
BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
416 static void dump_time(ostream &os,
bool use_local_time)
420 char buf[
sizeof "YYYY-MM-DDTHH:MM:SSzone"];
430 status = strftime(buf,
sizeof buf,
"%FT%T%Z", gmtime(&now));
432 status = strftime(buf,
sizeof buf,
"%FT%T%Z", localtime(&now));
435 LOG(
"Error getting time for Metadata Store ledger.");
449 if (get_exclusive_lock(d_ledger_name, fd)) {
450 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Ledger " << d_ledger_name <<
" write locked." << endl);
453 dump_time(of, d_use_local_time);
454 of <<
" " << d_ledger_entry << endl;
455 VERBOSE(
"MDS Ledger name: '" << d_ledger_name <<
"', entry: '" << d_ledger_entry +
"'.");
464 LOG(
"Warning: Metadata store could not write to its ledger file.");
469 throw BESInternalError(
"Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
483 throw BESInternalError(
"Empty name passed to the Metadata Store.", __FILE__, __LINE__);
485 return picosha2::hash256_hex_string(name[0] ==
'/' ? name :
"/" + name);
507 D4BaseTypeFactory factory;
508 DMR dmr(&factory, *d_dds);
515 d_dmr->print_dap4(xml);
528 d_dmr->getDDS()->print(os);
536 d_dds->print_das(os);
538 d_dmr->getDDS()->print_das(os);
559 const string &response_name)
561 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" BEGIN " << key << endl);
567 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Storing " << item_name << endl);
570 ofstream response(item_name.c_str(), ios::out|ios::app);
571 if (!response.is_open())
572 throw BESInternalError(
"Could not open '" + key +
"' to write the response.", __FILE__, __LINE__);
581 if (!
is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
599 VERBOSE(
"Metadata store: Wrote " << response_name <<
" response for '" << name <<
"'." << endl);
600 d_ledger_entry.append(
" ").append(key);
606 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Found " << item_name <<
" in the store already." << endl);
609 LOG(
"Metadata store: unable to store the " << response_name <<
" response for '" << name <<
"'." << endl);
614 throw BESInternalError(
"Could neither create or open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
648 d_ledger_entry = string(
"add DDS ").append(name);
661 #if SYMETRIC_ADD_RESPONSES
668 #if SYMETRIC_ADD_RESPONSES
669 return (stored_dds && stored_das && stored_dmr);
671 return (stored_dds && stored_das);
690 d_ledger_entry = string(
"add DMR ").append(name);
697 #if SYMETRIC_ADD_RESPONSES
710 #if SYMETRIC_ADD_RESPONSES
711 return (stored_dds && stored_das && stored_dmr);
733 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS hashing name '" << name <<
"', '" << suffix <<
"'"<< endl);
737 "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
742 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS lock for " << item_name <<
": " << lock() << endl);
745 LOG(
"MDS Cache hit for '" << name <<
"' and response " << object_name << endl);
747 LOG(
"MDS Cache miss for '" << name <<
"' and response " << object_name << endl);
975 time_t file_time = besRH->
get_lmt(realName);
981 if (file_time > cache_time){
1000 struct stat statbuf;
1002 if (stat(item_name.c_str(), &statbuf) == -1){
1006 return statbuf.st_mtime;
1025 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
1026 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1037 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
1051 const string &object_name)
1056 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
1057 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1070 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
1109 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
1111 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1114 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
1132 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
1134 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1137 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
1155 string hash =
get_hash(name + suffix);
1157 VERBOSE(
"Metadata store: Removed " << object_name <<
" response for '" << hash <<
"'." << endl);
1158 d_ledger_entry.append(
" ").append(hash);
1162 LOG(
"Metadata store: unable to remove the " << object_name <<
" response for '" << name <<
"' (" << strerror(errno) <<
")."<< endl);
1178 d_ledger_entry = string(
"remove ").append(name);
1190 #if SYMETRIC_ADD_RESPONSES
1191 return (removed_dds && removed_das && removed_dmr);
1193 return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1215 D4BaseTypeFactory d4_btf;
1216 auto_ptr<DMR> dmr(
new DMR(&d4_btf,
"mds"));
1218 D4ParserSax2 parser;
1219 parser.intern(oss.str(), dmr.get());
1221 dmr->set_factory(0);
1223 return dmr.release();
1252 fstream dds_fs(dds_tmp.
get_name().c_str(), std::fstream::out);
1262 BaseTypeFactory btf;
1263 auto_ptr<DDS> dds(
new DDS(&btf));
1267 fstream das_fs(das_tmp.
get_name().c_str(), std::fstream::out);
1277 auto_ptr<DAS> das(
new DAS());
1280 dds->transfer_attributes(das.get());
1281 dds->set_factory(0);
1283 return dds.release();
1288 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das,
const std::string &name) {
1289 string suffix =
"das_r";
1293 VERBOSE(
"Metadata store: Cache hit: read " <<
" response for '" << name <<
"'." << endl);
1294 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1297 das->parse(item_name);
1306 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);