44 #include <D4ParserSax2.h> 45 #include <XMLWriter.h> 46 #include <BaseTypeFactory.h> 47 #include <D4BaseTypeFactory.h> 49 #include "PicoSHA2/picosha2.h" 52 #include "TheBESKeys.h" 55 #include "BESContextManager.h" 58 #include "BESInternalError.h" 59 #include "BESInternalFatalError.h" 61 #include "GlobalMetadataStore.h" 63 #define DEBUG_KEY "metadata_store" 64 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0 67 #define AT_EXIT(x) atexit((x)) 81 #undef SYMETRIC_ADD_RESPONSES 87 static const unsigned int default_cache_size = 20;
88 static const string default_cache_prefix =
"mds";
89 static const string default_cache_dir =
"";
90 static const string default_ledger_name =
"mds_ledger.txt";
92 static const string PATH_KEY =
"DAP.GlobalMetadataStore.path";
93 static const string PREFIX_KEY =
"DAP.GlobalMetadataStore.prefix";
94 static const string SIZE_KEY =
"DAP.GlobalMetadataStore.size";
95 static const string LEDGER_KEY =
"DAP.GlobalMetadataStore.ledger";
96 static const string LOCAL_TIME_KEY =
"BES.LogTimeLocal";
99 bool GlobalMetadataStore::d_enabled =
true;
115 void GlobalMetadataStore::transfer_bytes(
int fd, ostream &os)
117 static const int BUFFER_SIZE = 16*1024;
119 #if _POSIX_C_SOURCE >= 200112L 121 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
123 ERROR(
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
126 char buf[BUFFER_SIZE + 1];
128 while(
int bytes_read = read(fd, buf, BUFFER_SIZE))
131 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
135 os.write(buf, bytes_read);
151 void GlobalMetadataStore::insert_xml_base(
int fd, ostream &os,
const string &xml_base)
153 static const int BUFFER_SIZE = 1024;
155 #if _POSIX_C_SOURCE >= 200112L 157 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
159 ERROR(
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
162 char buf[BUFFER_SIZE + 1];
163 size_t bytes_read = read(fd, buf, BUFFER_SIZE);
165 if(bytes_read == (
size_t)-1)
166 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
183 while (buf[i++] !=
'>')
190 char xml_base_literal[] =
"xml:base";
191 while (i < bytes_read) {
193 os.write(buf + s, i - s);
194 os <<
" xml:base=\"" << xml_base <<
"\"";
197 else if (j ==
sizeof(xml_base_literal) - 1) {
198 os.write(buf + s, i - s);
199 while (buf[i++] !=
'=')
201 while (buf[i++] !=
'"')
203 while (buf[i++] !=
'"')
205 os <<
"=\"" << xml_base <<
"\"";
208 else if (buf[i] == xml_base_literal[j]) {
219 os.write(buf + i, bytes_read - i);
222 transfer_bytes(fd, os);
225 unsigned long GlobalMetadataStore::get_cache_size_from_config()
229 unsigned long size_in_megabytes = default_cache_size;
233 "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY <<
"=" << size << endl);
234 istringstream iss(size);
235 iss >> size_in_megabytes;
238 return size_in_megabytes;
241 string GlobalMetadataStore::get_cache_prefix_from_config()
244 string prefix = default_cache_prefix;
248 "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY <<
"=" << prefix << endl);
256 string GlobalMetadataStore::get_cache_dir_from_config()
260 string cacheDir = default_cache_dir;
264 "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<<
"=" << cacheDir << endl);
302 GlobalMetadataStore::get_instance(
const string &cache_dir,
const string &prefix,
unsigned long long size)
304 if (d_enabled && d_instance == 0) {
306 d_enabled = d_instance->cache_enabled();
311 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
314 AT_EXIT(delete_instance);
316 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
320 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
332 GlobalMetadataStore::get_instance()
334 if (d_enabled && d_instance == 0) {
335 d_instance =
new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
336 get_cache_size_from_config());
337 d_enabled = d_instance->cache_enabled();
341 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
344 AT_EXIT(delete_instance);
346 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
350 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance() - d_instance: " << (
void *) d_instance << endl);
360 GlobalMetadataStore::initialize()
366 BESDEBUG(DEBUG_KEY,
"Located BES key " << LEDGER_KEY <<
"=" << d_ledger_name << endl);
369 d_ledger_name = default_ledger_name;
373 string local_time =
"no";
375 d_use_local_time = (local_time ==
"YES" || local_time ==
"Yes" || local_time ==
"yes");
392 GlobalMetadataStore::GlobalMetadataStore()
394 :
BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
410 static void dump_time(ostream &os,
bool use_local_time)
414 char buf[
sizeof "YYYY-MM-DDTHH:MM:SSzone"];
424 status = strftime(buf,
sizeof buf,
"%FT%T%Z", gmtime(&now));
426 status = strftime(buf,
sizeof buf,
"%FT%T%Z", localtime(&now));
429 LOG(
"Error getting time for Metadata Store ledger.");
442 ofstream of(d_ledger_name.c_str(), ios::app);
444 dump_time(of, d_use_local_time);
445 of <<
" " << d_ledger_entry << endl;
446 VERBOSE(
"MD Ledger name: '" << d_ledger_name <<
"', entry: '" << d_ledger_entry +
"'.");
449 LOG(
"Warning: Metadata store could not write to is ledger file.");
463 throw BESInternalError(
"Empty name passed to the Metadata Store.", __FILE__, __LINE__);
465 return picosha2::hash256_hex_string(name[0] ==
'/' ? name :
"/" + name);
487 D4BaseTypeFactory factory;
488 DMR dmr(&factory, *d_dds);
495 d_dmr->print_dap4(xml);
508 d_dmr->getDDS()->print(os);
516 d_dds->print_das(os);
518 d_dmr->getDDS()->print_das(os);
539 const string &response_name)
541 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" BEGIN " << key << endl);
549 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Storing " << item_name << endl);
552 ofstream response(item_name.c_str(), ios::out|ios::app);
553 if (!response.is_open())
554 throw BESInternalError(
"Could not open '" + key +
"' to write the response.", __FILE__, __LINE__);
563 if (!
is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
581 VERBOSE(
"Metadata store: Wrote " << response_name <<
" response for '" << name <<
"'." << endl);
582 d_ledger_entry.append(
" ").append(key);
588 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Found " << item_name <<
" in the store already." << endl);
591 LOG(
"Metadata store: unable to store the " << response_name <<
" response for '" << name <<
"'." << endl);
596 throw BESInternalError(
"Could neither create or open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
630 d_ledger_entry = string(
"add DDS ").append(name);
643 #if SYMETRIC_ADD_RESPONSES 650 #if SYMETRIC_ADD_RESPONSES 651 return (stored_dds && stored_das && stored_dmr);
653 return (stored_dds && stored_das);
672 d_ledger_entry = string(
"add DMR ").append(name);
679 #if SYMETRIC_ADD_RESPONSES 692 #if SYMETRIC_ADD_RESPONSES 693 return (stored_dds && stored_das && stored_dmr);
715 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS hashing name '" << name <<
"', '" << suffix <<
"'"<< endl);
719 "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
724 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS lock for " << item_name <<
": " << lock() << endl);
727 LOG(
"MDS Cache hit for '" << name <<
"' and response " << object_name << endl);
729 LOG(
"MDS Cache miss for '" << name <<
"' and response " << object_name << endl);
820 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
821 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
832 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
846 const string &object_name)
851 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
852 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
865 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
904 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
906 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE 909 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
927 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
929 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE 932 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
950 string hash =
get_hash(name + suffix);
952 VERBOSE(
"Metadata store: Removed " << object_name <<
" response for '" << hash <<
"'." << endl);
953 d_ledger_entry.append(
" ").append(hash);
957 LOG(
"Metadata store: unable to remove the " << object_name <<
" response for '" << name <<
"' (" << strerror(errno) <<
")."<< endl);
973 d_ledger_entry = string(
"remove ").append(name);
985 #if SYMETRIC_ADD_RESPONSES 986 return (removed_dds && removed_das && removed_dmr);
988 return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1010 D4BaseTypeFactory d4_btf;
1011 auto_ptr<DMR> dmr(
new DMR(&d4_btf,
"mds"));
1013 D4ParserSax2 parser;
1014 parser.intern(oss.str(), dmr.get());
1016 dmr->set_factory(0);
1018 return dmr.release();
1047 fstream dds_fs(dds_tmp.
get_name().c_str(), std::fstream::out);
1057 BaseTypeFactory btf;
1058 auto_ptr<DDS> dds(
new DDS(&btf));
1062 fstream das_fs(das_tmp.
get_name().c_str(), std::fstream::out);
1072 auto_ptr<DAS> das(
new DAS());
1075 dds->transfer_attributes(das.get());
1076 dds->set_factory(0);
1078 return dds.release();
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
exception thrown if an internal error is found and is fatal to the BES
exception thrown if inernal error encountered
std::string get_name() const
static string lowercase(const string &s)
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual void unlock_and_close(const std::string &target)
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
virtual string get_context(const string &name, bool &found)
retrieve the value of the specified context from the BES
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
Implementation of a caching mechanism for compressed data.
static TheBESKeys * TheKeys()
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
const std::string get_cache_directory()
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
Get a new temporary file.
virtual void purge_file(const std::string &file)
Purge a single file from the cache.