32 #ifdef HAVE_TR1_FUNCTIONAL
33 #include <tr1/functional>
41 #include <DapXmlNamespaces.h>
42 #include <ConstraintEvaluator.h>
43 #include <DDXParserSAX2.h>
47 #include <D4EnumDefs.h>
48 #include <D4Dimensions.h>
51 #include <D4ParserSax2.h>
58 #ifdef DAP2_STORED_RESULTS
59 #include <XDRStreamMarshaller.h>
60 #include <XDRStreamUnMarshaller.h>
63 #include <chunked_istream.h>
64 #include <D4StreamUnMarshaller.h>
67 #include <mime_util.h>
70 #include "BESStoredDapResultCache.h"
71 #include "BESDapResponseBuilder.h"
72 #include "BESInternalError.h"
75 #include "TheBESKeys.h"
78 #ifdef HAVE_TR1_FUNCTIONAL
79 #define HASH_OBJ std::tr1::hash
81 #define HASH_OBJ std::hash
85 #define BES_DATA_ROOT "BES.Data.RootDirectory"
86 #define BES_CATALOG_ROOT "BES.Catalog.catalog.RootDirectory"
92 bool BESStoredDapResultCache::d_enabled =
true;
93 const string BESStoredDapResultCache::SUBDIR_KEY =
"DAP.StoredResultsCache.subdir";
94 const string BESStoredDapResultCache::PREFIX_KEY =
"DAP.StoredResultsCache.prefix";
95 const string BESStoredDapResultCache::SIZE_KEY =
"DAP.StoredResultsCache.size";
97 unsigned long BESStoredDapResultCache::getCacheSizeFromConfig()
101 unsigned long size_in_megabytes = 0;
104 istringstream iss(size);
105 iss >> size_in_megabytes;
108 string msg =
"[ERROR] BESStoredDapResultCache::getCacheSize() - The BES Key " + SIZE_KEY
109 +
" is not set! It MUST be set to utilize the Stored Result Caching system. ";
110 BESDEBUG(
"cache", msg << endl);
113 return size_in_megabytes;
116 string BESStoredDapResultCache::getSubDirFromConfig()
123 string msg =
"[ERROR] BESStoredDapResultCache::getSubDirFromConfig() - The BES Key " + SUBDIR_KEY
124 +
" is not set! It MUST be set to utilize the Stored Result Caching system. ";
125 BESDEBUG(
"cache", msg << endl);
129 while (*subdir.begin() ==
'/' && subdir.length() > 0) {
130 subdir = subdir.substr(1);
139 string BESStoredDapResultCache::getResultPrefixFromConfig()
148 string msg =
"[ERROR] BESStoredDapResultCache::getResultPrefix() - The BES Key " + PREFIX_KEY
149 +
" is not set! It MUST be set to utilize the Stored Result Caching system. ";
150 BESDEBUG(
"cache", msg << endl);
157 string BESStoredDapResultCache::getBesDataRootDirFromConfig()
160 string cacheDir =
"";
165 string msg = ((string)
"[ERROR] BESStoredDapResultCache::getStoredResultsDir() - Neither the BES Key ")
166 + BES_CATALOG_ROOT +
"or the BES key " + BES_DATA_ROOT
167 +
" have been set! One MUST be set to utilize the Stored Result Caching system. ";
168 BESDEBUG(
"cache", msg << endl);
176 BESStoredDapResultCache::BESStoredDapResultCache()
178 BESDEBUG(
"cache",
"BESStoredDapResultCache::BESStoredDapResultCache() - BEGIN" << endl);
180 d_storedResultsSubdir = getSubDirFromConfig();
181 d_dataRootDir = getBesDataRootDirFromConfig();
184 d_resultFilePrefix = getResultPrefixFromConfig();
185 d_maxCacheSize = getCacheSizeFromConfig();
188 "BESStoredDapResultCache() - Stored results cache configuration params: " << resultsDir <<
", " << d_resultFilePrefix <<
", " << d_maxCacheSize << endl);
190 initialize(resultsDir, d_resultFilePrefix, d_maxCacheSize);
192 BESDEBUG(
"cache",
"BESStoredDapResultCache::BESStoredDapResultCache() - END" << endl);
198 BESStoredDapResultCache::BESStoredDapResultCache(
const string &data_root_dir,
const string &stored_results_subdir,
199 const string &result_file_prefix,
unsigned long long max_cache_size)
202 d_storedResultsSubdir = stored_results_subdir;
203 d_dataRootDir = data_root_dir;
204 d_resultFilePrefix = result_file_prefix;
205 d_maxCacheSize = max_cache_size;
206 initialize(
BESUtil::assemblePath(d_dataRootDir, stored_results_subdir), d_resultFilePrefix, d_maxCacheSize);
211 const string &result_file_prefix,
unsigned long long max_cache_size)
213 if (d_enabled && d_instance == 0) {
214 if (dir_exists(data_root_dir)) {
217 d_enabled = d_instance->cache_enabled();
221 BESDEBUG(
"cache",
"BESStoredDapResultCache::"<<__func__ <<
"() - " <<
222 "Cache is DISABLED"<< endl);
226 atexit(delete_instance);
228 BESDEBUG(
"cache",
"BESStoredDapResultCache::"<<__func__ <<
"() - " <<
229 "Cache is ENABLED"<< endl);
242 if (d_enabled && d_instance == 0) {
244 d_enabled = d_instance->cache_enabled();
248 BESDEBUG(
"cache",
"BESStoredDapResultCache::"<<__func__ <<
"() - " <<
249 "Cache is DISABLED"<< endl);
253 atexit(delete_instance);
255 BESDEBUG(
"cache",
"BESStoredDapResultCache::"<<__func__ <<
"() - " <<
256 "Cache is ENABLED"<< endl);
272 bool BESStoredDapResultCache::is_valid(
const string &cache_file_name,
const string &dataset)
277 off_t entry_size = 0;
278 time_t entry_time = 0;
280 if (stat(cache_file_name.c_str(), &buf) == 0) {
281 entry_size = buf.st_size;
282 entry_time = buf.st_mtime;
288 if (entry_size == 0)
return false;
290 time_t dataset_time = entry_time;
291 if (stat(dataset.c_str(), &buf) == 0) {
292 dataset_time = buf.st_mtime;
300 if (dataset_time > entry_time)
return false;
305 #ifdef DAP2_STORED_RESULTS
317 bool BESStoredDapResultCache::read_dap2_data_from_cache(
const string &cache_file_name, DDS *fdds)
320 "BESStoredDapResultCache::read_dap2_data_from_cache() - Opening cache file: " << cache_file_name << endl);
325 if (get_read_lock(cache_file_name, fd)) {
327 ifstream data(cache_file_name.c_str());
330 string mime = get_next_mime_header(data);
331 while (!mime.empty()) {
332 mime = get_next_mime_header(data);
336 DDXParser ddx_parser(fdds->get_factory());
339 string boundary = read_multipart_boundary(data);
341 "BESStoredDapResultCache::read_dap2_data_from_cache() - MPM Boundary: " << boundary << endl);
343 read_multipart_headers(data,
"text/xml", dods_ddx);
346 "BESStoredDapResultCache::read_dap2_data_from_cache() - Read the multipart haeaders" << endl);
352 ddx_parser.intern_stream(data, fdds, data_cid, boundary);
354 "BESStoredDapResultCache::read_dap2_data_from_cache() - Dataset name: " << fdds->get_dataset_name() << endl);
358 "BESStoredDapResultCache::read_dap2_data_from_cache() - DDX Parser Error: " << e.get_error_message() << endl);
364 "BESStoredDapResultCache::read_dap2_data_from_cache() - Data CID (before): " << data_cid << endl);
365 data_cid = cid_to_header_value(data_cid);
367 "BESStoredDapResultCache::read_dap2_data_from_cache() - Data CID (after): " << data_cid << endl);
371 read_multipart_headers(data,
"application/octet-stream", dods_data_ddx, data_cid);
376 XDRStreamUnMarshaller um(data);
377 for (DDS::Vars_iter i = fdds->var_begin(); i != fdds->var_end(); i++) {
378 (*i)->deserialize(um, fdds);
382 unlock_and_close(cache_file_name );
386 BESDEBUG(
"cache",
"BESStoredDapResultCache - The requested file does not exist. File: " + cache_file_name);
393 "BESStoredDapResultCache::read_dap4_data_from_cache() - caught exception, unlocking cache and re-throw." << endl);
395 if (fd != -1) unlock_and_close(cache_file_name );
412 bool BESStoredDapResultCache::read_dap4_data_from_cache(
const string &cache_file_name, libdap::DMR *dmr)
414 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - BEGIN" << endl);
419 if (get_read_lock(cache_file_name, fd)) {
421 "BESStoredDapResultCache::read_dap4_data_from_cache() - Opening cache file: " << cache_file_name << endl);
422 fstream in(cache_file_name.c_str(), ios::in | ios::binary);
433 chunked_istream cis(in, CHUNK_SIZE);
440 int chunk_size = cis.read_next_chunk();
443 "BESStoredDapResultCache::read_dap4_data_from_cache() - First chunk_size: " << chunk_size << endl);
445 if (chunk_size == EOF) {
446 throw InternalErr(__FILE__, __LINE__,
447 "BESStoredDapResultCache::read_dap4_data_from_cache() - Failed to read first chunk from file. Chunk size = EOF (aka "
448 + libdap::long_to_string(EOF) +
")");
452 char chunk[chunk_size];
453 cis.read(chunk, chunk_size);
454 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Read first chunk." << endl);
459 parser.intern(chunk, chunk_size - 2, dmr, debug);
460 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Parsed first chunk." << endl);
462 D4StreamUnMarshaller um(cis, cis.twiddle_bytes());
464 dmr->root()->deserialize(um, *dmr);
465 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - Deserialized data." << endl);
467 BESDEBUG(
"cache",
"BESStoredDapResultCache::read_dap4_data_from_cache() - END" << endl);
470 unlock_and_close(cache_file_name );
476 BESDEBUG(
"cache",
"BESStoredDapResultCache - The requested file does not exist. File: " + cache_file_name);
484 "BESStoredDapResultCache::read_dap4_data_from_cache() - caught exception, unlocking cache and re-throw." << endl);
486 if (fd != -1) unlock_and_close(cache_file_name );
491 #ifdef DAP2_STORED_RESULTS
497 BESStoredDapResultCache::get_cached_dap2_data_ddx(
const string &cache_file_name, BaseTypeFactory *factory,
498 const string &filename)
501 "BESStoredDapResultCache::get_cached_dap2_data_ddx() - Reading cache for " << cache_file_name << endl);
503 DDS *fdds =
new DDS(factory);
505 if (read_dap2_data_from_cache(cache_file_name, fdds)) {
507 fdds->filename(filename);
510 BESDEBUG(
"cache",
"DDS Filename: " << fdds->filename() << endl);
511 BESDEBUG(
"cache",
"DDS Dataset name: " << fdds->get_dataset_name() << endl);
513 fdds->set_factory(0);
517 DDS::Vars_iter i = fdds->var_begin();
518 while (i != fdds->var_end()) {
519 (*i)->set_read_p(
true);
520 (*i++)->set_send_p(
true);
539 const string &filename)
542 "BESStoredDapResultCache::get_cached_dap4_data() - Reading cache for " << cache_file_name << endl);
544 DMR *fdmr =
new DMR(factory);
546 BESDEBUG(
"cache",
"BESStoredDapResultCache::get_cached_dap4_data() - DMR Filename: " << fdmr->filename() << endl);
547 fdmr->set_filename(filename);
549 if (read_dap4_data_from_cache(cache_file_name, fdmr)) {
551 "BESStoredDapResultCache::get_cached_dap4_data() - DMR Dataset name: " << fdmr->name() << endl);
553 fdmr->set_factory(0);
557 fdmr->root()->set_send_p(
true);
558 fdmr->root()->set_read_p(
true);
566 #ifdef DAP2_STORED_RESULTS
571 string BESStoredDapResultCache::store_dap2_result(DDS &dds,
const string &constraint,
BESDapResponseBuilder *rb,
572 ConstraintEvaluator *eval)
574 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - BEGIN" << endl);
576 BaseTypeFactory factory;
580 string local_id = get_stored_result_local_id(dds.filename(), constraint, DAP_3_2);
581 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - local_id: "<< local_id << endl);
582 string cache_file_name = get_cache_file_name(local_id,
false);
583 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - cache_file_name: "<< cache_file_name << endl);
590 if (!is_valid(cache_file_name, dds.filename())) purge_file(cache_file_name);
592 if (get_read_lock(cache_file_name, fd)) {
594 "BESStoredDapResultCache::store_dap2_result() - Stored Result already exists. Not rewriting file: " << cache_file_name << endl);
596 else if (create_and_lock(cache_file_name, fd)) {
600 "BESStoredDapResultCache::store_dap2_result() - cache_file_name " << cache_file_name <<
", constraint: " << constraint << endl);
602 #if 0 // I shut this off because we know that the constraint and functions have already been evaluated - ndp
606 eval->parse_constraint(constraint, *fdds);
608 if (eval->function_clauses()) {
609 DDS *temp_fdds = eval->eval_function_clauses(*fdds);
615 ofstream data_stream(cache_file_name.c_str());
617 throw InternalErr(__FILE__, __LINE__,
618 "Could not open '" + cache_file_name +
"' to write cached response.");
620 string start =
"dataddx_cache_start", boundary =
"dataddx_cache_boundary";
624 ConstraintEvaluator eval;
628 dds.set_dap_version(
"3.2");
635 set_mime_multipart(data_stream, boundary, start, dods_data_ddx, x_plain,
638 rb->serialize_dap2_data_ddx(data_stream, (DDS**) &dds, eval, boundary, start);
641 data_stream << CRLF <<
"--" << boundary <<
"--" << CRLF;
648 exclusive_to_shared_lock(fd);
653 unsigned long long size = update_cache_info(cache_file_name);
654 if (cache_too_big(size)) update_and_purge(cache_file_name);
659 else if (get_read_lock(cache_file_name, fd)) {
661 "BESStoredDapResultCache::store_dap2_result() - Stored Result already exists. Not rewriting file: " << cache_file_name << endl);
664 throw InternalErr(__FILE__, __LINE__,
665 "BESStoredDapResultCache::store_dap2_result() - Cache error during function invocation.");
669 "BESStoredDapResultCache::store_dap2_result() - unlocking and closing cache file "<< cache_file_name << endl);
670 unlock_and_close(cache_file_name);
674 "BESStoredDapResultCache::store_dap2_result() - caught exception, unlocking cache and re-throw." << endl);
680 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap2_result() - END (local_id=`"<< local_id <<
"')" << endl);
692 string BESStoredDapResultCache::get_stored_result_local_id(
const string &dataset,
const string &ce,
693 libdap::DAPVersion version)
695 BESDEBUG(
"cache",
"get_stored_result_local_id() - BEGIN. dataset: " << dataset <<
", ce: " << ce << endl);
696 std::ostringstream ostr;
697 HASH_OBJ<std::string> str_hash;
698 string name = dataset +
"#" + ce;
699 ostr << str_hash(name);
700 string hashed_name = ostr.str();
701 BESDEBUG(
"cache",
"get_stored_result_local_id() - hashed_name: " << hashed_name << endl);
705 #ifdef DAP2_STORED_RESULTS
711 suffix =
".data_ddx";
719 throw BESInternalError(
"BESStoredDapResultCache::get_stored_result_local_id() - Unrecognized DAP version!!",
724 BESDEBUG(
"cache",
"get_stored_result_local_id() - Data file suffix: " << suffix << endl);
726 string local_id = d_resultFilePrefix + hashed_name + suffix;
727 BESDEBUG(
"cache",
"get_stored_result_local_id() - file: " << local_id << endl);
731 BESDEBUG(
"cache",
"get_stored_result_local_id() - END. local_id: " << local_id << endl);
741 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - BEGIN" << endl);
743 BaseTypeFactory factory;
747 string local_id = get_stored_result_local_id(dmr.filename(), constraint, DAP_4_0);
748 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - local_id: "<< local_id << endl);
749 string cache_file_name = get_cache_file_name(local_id,
false);
750 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - cache_file_name: "<< cache_file_name << endl);
757 if (!is_valid(cache_file_name, dmr.filename())) {
759 "BESStoredDapResultCache::store_dap4_result() - File is not valid. Purging file from cache. filename: " << cache_file_name << endl);
760 purge_file(cache_file_name);
763 if (get_read_lock(cache_file_name, fd)) {
765 "BESStoredDapResultCache::store_dap4_result() - Stored Result already exists. Not rewriting file: " << cache_file_name << endl);
767 else if (create_and_lock(cache_file_name, fd)) {
771 "BESStoredDapResultCache::store_dap4_result() - cache_file_name: " << cache_file_name <<
", constraint: " << constraint << endl);
773 ofstream data_stream(cache_file_name.c_str());
775 throw InternalErr(__FILE__, __LINE__,
776 "Could not open '" + cache_file_name +
"' to write cached response.");
787 exclusive_to_shared_lock(fd);
792 unsigned long long size = update_cache_info(cache_file_name);
793 if (cache_too_big(size)) update_and_purge(cache_file_name);
797 else if (get_read_lock(cache_file_name, fd)) {
799 "BESStoredDapResultCache::store_dap4_result() - Couldn't create and lock file, But I got a read lock. " "Result may have been created by another process. " "Not rewriting file: " << cache_file_name << endl);
802 throw InternalErr(__FILE__, __LINE__,
803 "BESStoredDapResultCache::store_dap4_result() - Cache error during function invocation.");
807 "BESStoredDapResultCache::store_dap4_result() - unlocking and closing cache file "<< cache_file_name << endl);
808 unlock_and_close(cache_file_name);
812 "BESStoredDapResultCache::store_dap4_result() - caught exception, unlocking cache and re-throw." << endl);
818 BESDEBUG(
"cache",
"BESStoredDapResultCache::store_dap4_result() - END (local_id=`"<< local_id <<
"')" << endl);