bes  Updated for version 3.20.6
GlobalMetadataStore.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2018 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <fcntl.h> // for posix_advise
28 #include <unistd.h>
29 
30 #include <cerrno>
31 #include <cstring>
32 
33 #include <iostream>
34 #include <string>
35 #include <fstream>
36 #include <sstream>
37 #include <functional>
38 #include <DAS.h>
39 #include <memory>
40 #include <sys/stat.h>
41 
42 #include <DapObj.h>
43 #include <DDS.h>
44 #include <DMR.h>
45 #include <D4ParserSax2.h>
46 #include <XMLWriter.h>
47 #include <BaseTypeFactory.h>
48 #include <D4BaseTypeFactory.h>
49 
50 #include "PicoSHA2/picosha2.h"
51 
52 #include "TempFile.h"
53 #include "TheBESKeys.h"
54 #include "BESUtil.h"
55 #include "BESLog.h"
56 #include "BESContextManager.h"
57 #include "BESDebug.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
61 
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
64 
65 #include "GlobalMetadataStore.h"
66 
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
69 
70 #ifdef HAVE_ATEXIT
71 #define AT_EXIT(x) atexit((x))
72 #else
73 #define AT_EXIT(x)
74 #endif
75 
85 #undef SYMETRIC_ADD_RESPONSES
86 
87 using namespace std;
88 using namespace libdap;
89 using namespace bes;
90 
91 static const unsigned int default_cache_size = 20; // 20 GB
92 static const string default_cache_prefix = "mds";
93 static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
94 static const string default_ledger_name = "mds_ledger.txt";
95 
96 static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
97 static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
98 static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
99 static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
100 static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
101 
102 GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
103 bool GlobalMetadataStore::d_enabled = true;
104 
119 void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
120 {
121  static const int BUFFER_SIZE = 16*1024;
122 
123 #if _POSIX_C_SOURCE >= 200112L
124  /* Advise the kernel of our access pattern. */
125  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
126  if (status != 0)
127  ERROR("Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
128 #endif
129 
130  char buf[BUFFER_SIZE + 1];
131 
132  while(int bytes_read = read(fd, buf, BUFFER_SIZE))
133  {
134  if(bytes_read == -1)
135  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
136  if (!bytes_read)
137  break;
138 
139  os.write(buf, bytes_read);
140  }
141 }
142 
155 void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
156 {
157  static const int BUFFER_SIZE = 1024;
158 
159 #if _POSIX_C_SOURCE >= 200112L
160  /* Advise the kernel of our access pattern. */
161  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
162  if (status != 0)
163  ERROR("Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
164 #endif
165 
166  char buf[BUFFER_SIZE + 1];
167  size_t bytes_read = read(fd, buf, BUFFER_SIZE);
168 
169  if(bytes_read == (size_t)-1)
170  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
171 
172  if (bytes_read == 0)
173  return;
174 
175  // Every valid DMR/++ response in the MDS starts with:
176  // <?xml version="1.0" encoding="ISO‌-8859-1"?>
177  //
178  // and has one of two kinds of <Dataset...> tags
179  // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
180  // 2: <Dataset xmlns="..." ... >
181  //
182  // Assume it is well formed and always includes the prolog,
183  // but might not use <CR> <CRLF> chars
184 
185  // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
186  size_t i = 0;
187  while (buf[i++] != '>')
188  ; // 'i' now points one char past the xml prolog
189  os.write(buf, i);
190 
191  // transfer <Dataset ...> with new value for xml:base
192  size_t s = i; // start of <Dataset ...>
193  size_t j = 0;
194  char xml_base_literal[] = "xml:base";
195  while (i < bytes_read) {
196  if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
197  os.write(buf + s, i - s);
198  os << " xml:base=\"" << xml_base << "\"";
199  break;
200  }
201  else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
202  os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
203  while (buf[i++] != '=')
204  ; // read/discard '="..."'
205  while (buf[i++] != '"')
206  ;
207  while (buf[i++] != '"')
208  ;
209  os << "=\"" << xml_base << "\""; // write the new xml:base value
210  break;
211  }
212  else if (buf[i] == xml_base_literal[j]) {
213  ++j;
214  }
215  else {
216  j = 0;
217  }
218 
219  ++i;
220  }
221 
222  // transfer the rest
223  os.write(buf + i, bytes_read - i);
224 
225  // Now, if the response is more than 1k, use faster code to finish the tx
226  transfer_bytes(fd, os);
227 }
228 
229 unsigned long GlobalMetadataStore::get_cache_size_from_config()
230 {
231  bool found;
232  string size;
233  unsigned long size_in_megabytes = default_cache_size;
234  TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
235  if (found) {
236  BESDEBUG(DEBUG_KEY,
237  "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
238  istringstream iss(size);
239  iss >> size_in_megabytes;
240  }
241 
242  return size_in_megabytes;
243 }
244 
245 string GlobalMetadataStore::get_cache_prefix_from_config()
246 {
247  bool found;
248  string prefix = default_cache_prefix;
249  TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
250  if (found) {
251  BESDEBUG(DEBUG_KEY,
252  "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
253  prefix = BESUtil::lowercase(prefix);
254  }
255 
256  return prefix;
257 }
258 
259 // If the cache prefix is the empty string, the cache is turned off.
260 string GlobalMetadataStore::get_cache_dir_from_config()
261 {
262  bool found;
263 
264  string cacheDir = default_cache_dir;
265  TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
266  if (found) {
267  BESDEBUG(DEBUG_KEY,
268  "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
269  }
270 
271  return cacheDir;
272 }
273 
287 
306 GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
307 {
308  if (d_enabled && d_instance == 0) {
309  d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
310  d_enabled = d_instance->cache_enabled();
311  if (!d_enabled) {
312  delete d_instance;
313  d_instance = 0;
314 
315  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
316  }
317  else {
318  AT_EXIT(delete_instance);
319 
320  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
321  }
322  }
323 
324  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
325 
326  return d_instance;
327 }
328 
336 GlobalMetadataStore::get_instance()
337 {
338  if (d_enabled && d_instance == 0) {
339  d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
340  get_cache_size_from_config());
341  d_enabled = d_instance->cache_enabled();
342  if (!d_enabled) {
343  delete d_instance;
344  d_instance = NULL;
345  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
346  }
347  else {
348  AT_EXIT(delete_instance);
349 
350  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
351  }
352  }
353 
354  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
355 
356  return d_instance;
357 }
359 
363 void
364 GlobalMetadataStore::initialize()
365 {
366  bool found;
367 
368  TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
369  if (found) {
370  BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
371  }
372  else {
373  d_ledger_name = default_ledger_name;
374  }
375 
376  ofstream of(d_ledger_name.c_str(), ios::app);
377 
378  // By default, use UTC in the logs.
379  string local_time = "no";
380  TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
381  d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
382 }
383 
398 GlobalMetadataStore::GlobalMetadataStore()
400  : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
401 {
402  initialize();
403 }
404 
405 GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
406  unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
407 {
408  initialize();
409 }
411 
416 static void dump_time(ostream &os, bool use_local_time)
417 {
418  time_t now;
419  time(&now);
420  char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
421  int status = 0;
422 
423  // From StackOverflow:
424  // This will work too, if your compiler doesn't support %F or %T:
425  // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
426  //
427  // Apologies for the twisted logic - UTC is the default. Override to
428  // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
429  if (!use_local_time)
430  status = strftime(buf, sizeof buf, "%FT%T%Z", gmtime(&now));
431  else
432  status = strftime(buf, sizeof buf, "%FT%T%Z", localtime(&now));
433 
434  if (!status)
435  LOG("Error getting time for Metadata Store ledger.");
436 
437  os << buf;
438 }
439 
443 void
445 {
446  // open just once, <- done SBL 11.7.19
447 
448  int fd; // value-result parameter;
449  if (get_exclusive_lock(d_ledger_name, fd)) {
450  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Ledger " << d_ledger_name << " write locked." << endl);
451  if (of) {
452  try {
453  dump_time(of, d_use_local_time);
454  of << " " << d_ledger_entry << endl;
455  VERBOSE("MDS Ledger name: '" << d_ledger_name << "', entry: '" << d_ledger_entry + "'.");
456  unlock_and_close(d_ledger_name); // closes fd
457  }
458  catch (...) {
459  unlock_and_close(d_ledger_name);
460  throw;
461  }
462  }
463  else {
464  LOG("Warning: Metadata store could not write to its ledger file.");
465  unlock_and_close(d_ledger_name);
466  }
467  }
468  else {
469  throw BESInternalError("Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
470  }
471 }
472 
479 string
480 GlobalMetadataStore::get_hash(const string &name)
481 {
482  if (name.empty())
483  throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
484 
485  return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
486 }
487 
505 {
506  if (d_dds) {
507  D4BaseTypeFactory factory;
508  DMR dmr(&factory, *d_dds);
509  XMLWriter xml;
510  dmr.print_dap4(xml);
511  os << xml.get_doc();
512  }
513  else if (d_dmr) {
514  XMLWriter xml;
515  d_dmr->print_dap4(xml);
516  os << xml.get_doc();
517  }
518  else {
519  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
520  }
521 }
522 
525  if (d_dds)
526  d_dds->print(os);
527  else if (d_dmr)
528  d_dmr->getDDS()->print(os);
529  else
530  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
531 }
532 
535  if (d_dds)
536  d_dds->print_das(os);
537  else if (d_dmr)
538  d_dmr->getDDS()->print_das(os);
539  else
540  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
541 }
542 
557 bool
558 GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
559  const string &response_name)
560 {
561  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
562 
563  string item_name = get_cache_file_name(key, false /*mangle*/);
564 
565  int fd;
566  if (create_and_lock(item_name, fd)) {
567  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
568 
569  // Get an output stream directed at the locked cache file
570  ofstream response(item_name.c_str(), ios::out|ios::app);
571  if (!response.is_open())
572  throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
573 
574  try {
575  // for the different writers, look at the StreamDAP struct in the class
576  // definition. jhrg 2.27.18
577  writer(response); // different writers can write the DDS, DAS or DMR
578 
579  // Compute/update/maintain the cache size? This is extra work
580  // that might never be used. It also locks the cache...
581  if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
582  // This enables the call to update_cache_info() below.
584 
585  unsigned long long size = update_cache_info(item_name);
586  if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
587  }
588 
589  unlock_and_close(item_name);
590  }
591  catch (...) {
592  // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
593  response.close();
594  this->purge_file(item_name);
595  unlock_and_close(item_name);
596  throw;
597  }
598 
599  VERBOSE("Metadata store: Wrote " << response_name << " response for '" << name << "'." << endl);
600  d_ledger_entry.append(" ").append(key);
601 
602  return true;
603  }
604  else if (get_read_lock(item_name, fd)) {
605  // We found the key; didn't add this because it was already here
606  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
607  unlock_and_close(item_name);
608 
609  LOG("Metadata store: unable to store the " << response_name << " response for '" << name << "'." << endl);
610 
611  return false;
612  }
613  else {
614  throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
615  }
616 }
617 
631 
644 bool
645 GlobalMetadataStore::add_responses(DDS *dds, const string &name)
646 {
647  // Start the index entry
648  d_ledger_entry = string("add DDS ").append(name);
649 
650  // I'm appending the 'dds r' string to the name before hashing so that
651  // the different hashes for the file's DDS, DAS, ..., are all very different.
652  // This will be useful if we use S3 instead of EFS for the Metadata Store.
653  //
654  // The helper() also updates the ledger string.
655  StreamDDS write_the_dds_response(dds);
656  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
657 
658  StreamDAS write_the_das_response(dds);
659  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
660 
661 #if SYMETRIC_ADD_RESPONSES
662  StreamDMR write_the_dmr_response(dds);
663  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
664 #endif
665 
666  write_ledger(); // write the index line
667 
668 #if SYMETRIC_ADD_RESPONSES
669  return (stored_dds && stored_das && stored_dmr);
670 #else
671  return (stored_dds && stored_das);
672 #endif
673 }
674 
686 bool
687 GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
688 {
689  // Start the index entry
690  d_ledger_entry = string("add DMR ").append(name);
691 
692  // I'm appending the 'dds r' string to the name before hashing so that
693  // the different hashes for the file's DDS, DAS, ..., are all very different.
694  // This will be useful if we use S3 instead of EFS for the Metadata Store.
695  //
696  // The helper() also updates the ledger string.
697 #if SYMETRIC_ADD_RESPONSES
698  StreamDDS write_the_dds_response(dmr);
699  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
700 
701  StreamDAS write_the_das_response(dmr);
702  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
703 #endif
704 
705  StreamDMR write_the_dmr_response(dmr);
706  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
707 
708  write_ledger(); // write the index line
709 
710 #if SYMETRIC_ADD_RESPONSES
711  return (stored_dds && stored_das && stored_dmr);
712 #else
713  return(stored_dmr /* && stored_dmrpp */);
714 #endif
715 }
717 
731 GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
732 {
733  BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
734 
735  if (name.empty())
736  throw BESInternalError("An empty name string was received by "
737  "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
738 
739  string item_name = get_cache_file_name(get_hash(name + suffix), false);
740  int fd;
741  MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
742  BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
743 
744  if (lock())
745  LOG("MDS Cache hit for '" << name << "' and response " << object_name << endl);
746  else
747  LOG("MDS Cache miss for '" << name << "' and response " << object_name << endl);
748 
749  return lock;
750  }
751 
772 {
773  return get_read_lock_helper(name,"dmr_r","DMR");
774 }//end is_dmr_available(string)
775 
778 {
779  //call get_read_lock_helper
780  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(), "dmr_r", "DMR");
781  if (lock()){
782 
783  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmr_r");
784 
785  if(reload){
786  lock.clearLock();
787  return lock;
788  }//end if
789  else{
790  return lock;
791  }//end else
792 
793  }//end if(is locked)
794  else{
795  return lock;
796  }//end else
797 
798 }//end is_dmr_available(BESContainer)
799 
801 GlobalMetadataStore::is_dmr_available(const std::string &realName, const std::string &relativeName, const std::string &fileType)
802 {
803  //call get_read_lock_helper
804  MDSReadLock lock = get_read_lock_helper(relativeName,"dmr_r","DMR");
805  if (lock()){
806 
807  bool reload = is_available_helper(realName, relativeName, fileType, "dmr_r");
808 
809  if(reload){
810  lock.clearLock();
811  return lock;
812  }//end if
813  else{
814  return lock;
815  }//end else
816 
817  }//end if(is locked)
818  else{
819  return lock;
820  }//end else
821 
822 }//end is_dmr_available(string, string, string)
823 
833 {
834  return get_read_lock_helper(name,"dds_r","DDS");
835 }//end is_dds_available(string)
836 
849 {
850  //call get_read_lock_helper
851  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dds_r","DDS");
852  if (lock()){
853 
854  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dds_r");
855 
856  if(reload){
857  lock.clearLock();
858  return lock;
859  }//end if
860  else{
861  return lock;
862  }//end else
863 
864  }//end if(is locked)
865  else{
866  return lock;
867  }//end else
868 
869 }//end is_dds_available(BESContainer)
870 
880 {
881  return get_read_lock_helper(name,"das_r","DAS");
882 }//end is_das_available(string)
883 
886 {
887  //return get_read_lock_helper(name, "das_r", "DAS");
888  //call get_read_lock_helper
889  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"das_r","DAS");
890  if (lock()){
891 
892  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "das_r");
893 
894  if(reload){
895  lock.clearLock();
896  return lock;
897  }//end if
898  else{
899  return lock;
900  }//end else
901 
902  }//end if(is locked)
903  else{
904  return lock;
905  }//end else
906 
907 }//end is_das_available(BESContainer)
908 
929 {
930  return get_read_lock_helper(name,"dmrpp_r","DMR++");
931 }//end is_dmrpp_available(string)
932 
935 {
936  //return get_read_lock_helper(name, "dmrpp_r", "DMR++");
937  //call get_read_lock_helper
938  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dmrpp_r","DMR++");
939  if (lock()){
940 
941  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmrpp_r");
942 
943  if(reload){
944  lock.clearLock();
945  return lock;
946  }//end if
947  else{
948  return lock;
949  }//end else
950 
951  }//end if(is locked)
952  else{
953  return lock;
954  }//end else
955 
956 }//end is_dmrpp_available(BESContainer)
957 
968 bool
969 GlobalMetadataStore::is_available_helper(const string &realName, const string &relativeName, const string &fileType, const string &suffix)
970 {
971  //use type with find_handler() to get handler
972  BESRequestHandler *besRH = BESRequestHandlerList::TheList()->find_handler(fileType);
973 
974  //use handler.get_lmt()
975  time_t file_time = besRH->get_lmt(realName);
976 
977  //get the cache time of the handler
978  time_t cache_time = get_cache_lmt(relativeName, suffix);
979 
980  //compare file lmt and time of creation of cache
981  if (file_time > cache_time){
982  return true;
983  }//end if(file > cache)
984  else {
985  return false;
986  }//end else
987 }
988 
996 time_t
997 GlobalMetadataStore::get_cache_lmt(const string &fileName, const string &suffix)
998 {
999  string item_name = get_cache_file_name(get_hash(fileName + suffix), false);
1000  struct stat statbuf;
1001 
1002  if (stat(item_name.c_str(), &statbuf) == -1){
1003  throw BESNotFoundError(strerror(errno), __FILE__, __LINE__);
1004  }//end if(error)
1005 
1006  return statbuf.st_mtime;
1007 }//end get_cache_lmt()
1008 
1011 
1019 void
1020 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
1021 {
1022  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1023  int fd; // value-result parameter;
1024  if (get_read_lock(item_name, fd)) {
1025  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1026  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1027  try {
1028  transfer_bytes(fd, os);
1029  unlock_and_close(item_name); // closes fd
1030  }
1031  catch (...) {
1032  unlock_and_close(item_name);
1033  throw;
1034  }
1035  }
1036  else {
1037  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1038  }
1039 }
1040 
1049 void
1050 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
1051  const string &object_name)
1052 {
1053  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1054  int fd; // value-result parameter;
1055  if (get_read_lock(item_name, fd)) {
1056  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1057  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1058  try {
1059  insert_xml_base(fd, os, xml_base);
1060 
1061  transfer_bytes(fd, os);
1062  unlock_and_close(item_name); // closes fd
1063  }
1064  catch (...) {
1065  unlock_and_close(item_name);
1066  throw;
1067  }
1068  }
1069  else {
1070  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1071  }
1072 }
1074 
1081 void
1082 GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
1083 {
1084  write_response_helper(name, os, "dds_r", "DDS");
1085 }
1086 
1093 void
1094 GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
1095 {
1096  write_response_helper(name, os, "das_r", "DAS");
1097 }
1098 
1105 void
1106 GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
1107 {
1108  bool found = false;
1109  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1110  if (!found) {
1111 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1112  write_response_helper(name, os, "dmr_r", "DMR");
1113 #else
1114  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1115 #endif
1116  }
1117  else {
1118  write_response_helper(name, os, "dmr_r", xml_base, "DMR");
1119  }
1120 }
1121 
1128 void
1129 GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
1130 {
1131  bool found = false;
1132  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1133  if (!found) {
1134 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1135  write_response_helper(name, os, "dmrpp_r", "DMR++");
1136 #else
1137  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1138 #endif
1139  }
1140  else {
1141  write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
1142  }
1143 }
1144 
1152 bool
1153 GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
1154 {
1155  string hash = get_hash(name + suffix);
1156  if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
1157  VERBOSE("Metadata store: Removed " << object_name << " response for '" << hash << "'." << endl);
1158  d_ledger_entry.append(" ").append(hash);
1159  return true;
1160  }
1161  else {
1162  LOG("Metadata store: unable to remove the " << object_name << " response for '" << name << "' (" << strerror(errno) << ")."<< endl);
1163  }
1164 
1165  return false;
1166 }
1167 
1174 bool
1176 {
1177  // Start the index entry
1178  d_ledger_entry = string("remove ").append(name);
1179 
1180  bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
1181 
1182  bool removed_das = remove_response_helper(name, "das_r", "DAS");
1183 
1184  bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
1185 
1186  bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
1187 
1188  write_ledger(); // write the index line
1189 
1190 #if SYMETRIC_ADD_RESPONSES
1191  return (removed_dds && removed_das && removed_dmr);
1192 #else
1193  return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1194 #endif
1195 }
1196 
1209 DMR *
1211 {
1212  stringstream oss;
1213  write_dmr_response(name, oss); // throws BESInternalError if not found
1214 
1215  D4BaseTypeFactory d4_btf;
1216  auto_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1217 
1218  D4ParserSax2 parser;
1219  parser.intern(oss.str(), dmr.get());
1220 
1221  dmr->set_factory(0);
1222 
1223  return dmr.release();
1224 }
1225 
1247 DDS *
1249 {
1250  TempFile dds_tmp(get_cache_directory() + "/opendapXXXXXX");
1251 
1252  fstream dds_fs(dds_tmp.get_name().c_str(), std::fstream::out);
1253  try {
1254  write_dds_response(name, dds_fs); // throws BESInternalError if not found
1255  dds_fs.close();
1256  }
1257  catch (...) {
1258  dds_fs.close();
1259  throw;
1260  }
1261 
1262  BaseTypeFactory btf;
1263  auto_ptr<DDS> dds(new DDS(&btf));
1264  dds->parse(dds_tmp.get_name());
1265 
1266  TempFile das_tmp(get_cache_directory() + "/opendapXXXXXX");
1267  fstream das_fs(das_tmp.get_name().c_str(), std::fstream::out);
1268  try {
1269  write_das_response(name, das_fs); // throws BESInternalError if not found
1270  das_fs.close();
1271  }
1272  catch (...) {
1273  das_fs.close();
1274  throw;
1275  }
1276 
1277  auto_ptr<DAS> das(new DAS());
1278  das->parse(das_tmp.get_name());
1279 
1280  dds->transfer_attributes(das.get());
1281  dds->set_factory(0);
1282 
1283  return dds.release();
1284 }
1285 
1286 
1287 void
1288 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das, const std::string &name) {
1289  string suffix = "das_r";
1290  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1291  int fd; // value-result parameter;
1292  if (get_read_lock(item_name, fd)) {
1293  VERBOSE("Metadata store: Cache hit: read " << " response for '" << name << "'." << endl);
1294  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1295  try {
1296  // Just generate the DAS by parsing from the file
1297  das->parse(item_name);
1298  unlock_and_close(item_name); // closes fd
1299  }
1300  catch (...) {
1301  unlock_and_close(item_name);
1302  throw;
1303  }
1304  }
1305  else {
1306  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1307  }
1308 
1309 }
1310 
BESRequestHandler
Represents a specific data type request handler.
Definition: BESRequestHandler.h:74
BESFileLockingCache::get_read_lock
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
Definition: BESFileLockingCache.cc:544
bes::GlobalMetadataStore::MDSReadLock
Unlock and close the MDS item when the ReadLock goes out of scope.
Definition: GlobalMetadataStore.h:193
BESInternalFatalError
exception thrown if an internal error is found and is fatal to the BES
Definition: BESInternalFatalError.h:43
bes::GlobalMetadataStore::StreamDDS::operator()
virtual void operator()(std::ostream &os)
Definition: GlobalMetadataStore.cc:524
bes::GlobalMetadataStore::is_das_available
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
Definition: GlobalMetadataStore.cc:879
bes::GlobalMetadataStore::get_dmr_object
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
Definition: GlobalMetadataStore.cc:1210
bes::GlobalMetadataStore::StreamDMR
Instantiate with a DDS or DMR and use to write the DMR response.
Definition: GlobalMetadataStore.h:162
bes::GlobalMetadataStore::insert_xml_base
static void insert_xml_base(int fd, std::ostream &os, const std::string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
Definition: GlobalMetadataStore.cc:155
BESNotFoundError
error thrown if the resource requested cannot be found
Definition: BESNotFoundError.h:40
bes::GlobalMetadataStore::write_dmr_response
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
Definition: GlobalMetadataStore.cc:1106
bes::GlobalMetadataStore::remove_response_helper
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
Definition: GlobalMetadataStore.cc:1153
BESFileLockingCache::create_and_lock
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
Definition: BESFileLockingCache.cc:599
bes::GlobalMetadataStore::get_cache_lmt
virtual time_t get_cache_lmt(const std::string &fileName, const std::string &suffix)
Get the last modified time for the cached object file.
Definition: GlobalMetadataStore.cc:997
bes::GlobalMetadataStore::write_dmrpp_response
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Definition: GlobalMetadataStore.cc:1129
BESContainer::get_container_type
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
Definition: BESContainer.h:232
BESContainer::get_relative_name
std::string get_relative_name() const
Get the relative name of the object in this container.
Definition: BESContainer.h:186
BESFileLockingCache::is_unlimited
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
Definition: BESFileLockingCache.h:177
bes::GlobalMetadataStore::transfer_bytes
static void transfer_bytes(int fd, std::ostream &os)
Definition: GlobalMetadataStore.cc:119
bes::GlobalMetadataStore::StreamDMR::operator()
virtual void operator()(std::ostream &os)
Use an object (DDS or DMR) to write data to the MDS.
Definition: GlobalMetadataStore.cc:504
BESFileLockingCache::purge_file
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
Definition: BESFileLockingCache.cc:1085
bes::GlobalMetadataStore::write_das_response
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
Definition: GlobalMetadataStore.cc:1094
bes::GlobalMetadataStore::StreamDAS::operator()
virtual void operator()(std::ostream &os)
Definition: GlobalMetadataStore.cc:534
libdap
Definition: BESDapFunctionResponseCache.h:35
BESFileLockingCache::unlock_and_close
virtual void unlock_and_close(const std::string &target)
Definition: BESFileLockingCache.cc:713
bes::GlobalMetadataStore
Store the DAP metadata responses.
Definition: GlobalMetadataStore.h:89
bes::GlobalMetadataStore::GlobalMetadataStore
GlobalMetadataStore()
Definition: GlobalMetadataStore.cc:399
bes::GlobalMetadataStore::remove_responses
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
Definition: GlobalMetadataStore.cc:1175
TheBESKeys::TheKeys
static TheBESKeys * TheKeys()
Definition: TheBESKeys.cc:62
bes::GlobalMetadataStore::add_responses
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
Definition: GlobalMetadataStore.cc:645
bes::GlobalMetadataStore::StreamDDS
Instantiate with a DDS or DMR and use to write the DDS response.
Definition: GlobalMetadataStore.h:146
bes::TempFile::get_name
std::string get_name() const
Definition: TempFile.h:70
BESInternalError
exception thrown if internal error encountered
Definition: BESInternalError.h:43
bes::GlobalMetadataStore::write_dds_response
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
Definition: GlobalMetadataStore.cc:1082
BESFileLockingCache
Implementation of a caching mechanism for compressed data.
Definition: BESFileLockingCache.h:85
TheBESKeys::get_value
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
Definition: TheBESKeys.cc:272
bes::GlobalMetadataStore::get_read_lock_helper
MDSReadLock get_read_lock_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
Definition: GlobalMetadataStore.cc:731
bes::GlobalMetadataStore::get_dds_object
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
Definition: GlobalMetadataStore.cc:1248
bes::GlobalMetadataStore::initialize
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
Definition: GlobalMetadataStore.cc:364
bes::GlobalMetadataStore::is_dds_available
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
Definition: GlobalMetadataStore.cc:832
bes::GlobalMetadataStore::StreamDAS
Instantiate with a DDS or DMR and use to write the DAS response.
Definition: GlobalMetadataStore.h:154
BESFileLockingCache::update_cache_info
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
Definition: BESFileLockingCache.cc:737
BESFileLockingCache::get_cache_file_name
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
Definition: BESFileLockingCache.cc:451
BESContainer
A container is something that holds data. E.G., a netcdf file or a database entry.
Definition: BESContainer.h:65
bes::GlobalMetadataStore::get_hash
std::string get_hash(const std::string &name)
Definition: GlobalMetadataStore.cc:480
bes::GlobalMetadataStore::StreamDAP
Definition: GlobalMetadataStore.h:132
bes::GlobalMetadataStore::is_dmrpp_available
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
Definition: GlobalMetadataStore.cc:928
BESContainer::get_real_name
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
Definition: BESContainer.h:180
BESFileLockingCache::update_and_purge
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
Definition: BESFileLockingCache.cc:940
BESUtil::lowercase
static std::string lowercase(const std::string &s)
Definition: BESUtil.cc:200
BESContextManager::get_context
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Definition: BESContextManager.cc:77
bes::GlobalMetadataStore::is_available_helper
virtual bool is_available_helper(const std::string &realName, const std::string &relativeName, const std::string &fileType, const std::string &suffix)
helper function that checks if last modified time is greater than cached file
Definition: GlobalMetadataStore.cc:969
bes::GlobalMetadataStore::write_ledger
void write_ledger()
Definition: GlobalMetadataStore.cc:444
bes::TempFile
Get a new temporary file.
Definition: TempFile.h:46
bes::GlobalMetadataStore::is_dmr_available
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
Definition: GlobalMetadataStore.cc:771
BESRequestHandlerList::find_handler
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Definition: BESRequestHandlerList.cc:95
bes::GlobalMetadataStore::store_dap_response
bool store_dap_response(StreamDAP &writer, const std::string &key, const std::string &name, const std::string &response_name)
Definition: GlobalMetadataStore.cc:558
BESRequestHandler::get_lmt
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
Definition: BESRequestHandler.cc:133
BESFileLockingCache::get_cache_directory
const std::string get_cache_directory()
Definition: BESFileLockingCache.h:188
BESFileLockingCache::cache_too_big
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
Definition: BESFileLockingCache.cc:780
bes::GlobalMetadataStore::write_response_helper
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
Definition: GlobalMetadataStore.cc:1020
BESFileLockingCache::exclusive_to_shared_lock
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
Definition: BESFileLockingCache.cc:630