libdap++  Updated for version 3.11.7
ResponseBuilder.cc
Go to the documentation of this file.
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2011 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <signal.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30 #include <uuid/uuid.h> // used to build CID header value for data ddx
31 
32 #ifndef WIN32
33 #include <sys/wait.h>
34 #else
35 #include <io.h>
36 #include <fcntl.h>
37 #include <process.h>
38 #endif
39 
40 #include <iostream>
41 #include <string>
42 #include <sstream>
43 #include <fstream>
44 
45 #include <cstring>
46 #include <ctime>
47 
48 //#define DODS_DEBUG
49 
50 #if 0
51 //FIXME
52 #include "BaseType.h"
53 #include "Array.h"
54 #include "Grid.h"
55 #endif
56 #include "DAS.h"
57 #include "DDS.h"
58 //#include "Connect.h"
59 //#include "Response.h"
60 #include "DDXParserSAX2.h"
61 #include "Ancillary.h"
62 #include "ResponseBuilder.h"
63 #include "XDRStreamMarshaller.h"
64 #include "XDRFileUnMarshaller.h"
65 
66 #include "DAPCache3.h"
67 
68 #include "debug.h"
69 #include "mime_util.h" // for last_modified_time() and rfc_822_date()
70 #include "escaping.h"
71 #include "util.h"
72 
73 #ifndef WIN32
74 #include "SignalHandler.h"
75 #include "EventHandler.h"
76 #include "AlarmHandler.h"
77 #endif
78 
79 #define CRLF "\r\n" // Change here, expr-test.cc
80 #define FUNCTION_CACHE "/tmp/dap_functions_cache/"
81 #define FUNCTION_CACHE_PREFIX "f"
82 // Cache size in megabytes; 20,000M -> 20GB
83 #define FUNCTION_CACHE_SIZE 20000
84 
85 using namespace std;
86 
87 namespace libdap {
88 
89 ResponseBuilder::~ResponseBuilder() {
90 }
91 
94 void ResponseBuilder::initialize() {
95  // Set default values. Don't use the C++ constructor initialization so
96  // that a subclass can have more control over this process.
97  d_dataset = "";
98  d_ce = "";
99  d_btp_func_ce = "";
100  d_timeout = 0;
101 
102  d_default_protocol = DAP_PROTOCOL_VERSION;
103 
104  // Cache size is given in megabytes and later converted to bytes
105  // for internal use.
106  d_cache = 0;
107 
108  // Without this, the directory becomes a low-budget config param since
109  // the cache will only be used if the directory exists.
110  // TODO fix this mess by adding a real config param in bes.conf
111 #if 0
112  if (!dir_writable(FUNCTION_CACHE))
113  mkdir(FUNCTION_CACHE, 0777);
114 #endif
115 
116  if (dir_exists(FUNCTION_CACHE)) {
117  DBG(cerr << "the FUNCTION_CACHE directory (" << FUNCTION_CACHE <<") exists" << endl);
119  }
120  else {
121  DBG(cerr << "the FUNCTION_CACHE directory (" << FUNCTION_CACHE <<") does not exist - not caching" << endl);
122  }
123 
124 #ifdef WIN32
125  // We want serving from win32 to behave in a manner
126  // similar to the UNIX way - no CR->NL terminated lines
127  // in files. Hence stdout goes to binary mode.
128  _setmode(_fileno(stdout), _O_BINARY);
129 #endif
130 }
131 
138 string ResponseBuilder::get_ce() const {
139  return d_ce;
140 }
141 
142 void ResponseBuilder::set_ce(string _ce) {
143  d_ce = www2id(_ce, "%", "%20");
144 }
145 
154 string ResponseBuilder::get_dataset_name() const {
155  return d_dataset;
156 }
157 
158 void ResponseBuilder::set_dataset_name(const string ds) {
159  d_dataset = www2id(ds, "%", "%20");
160 }
161 
166 void ResponseBuilder::set_timeout(int t) {
167  d_timeout = t;
168 }
169 
171 int ResponseBuilder::get_timeout() const {
172  return d_timeout;
173 }
174 
185 void ResponseBuilder::establish_timeout(ostream &stream) const {
186 #ifndef WIN32
187  if (d_timeout > 0) {
188  SignalHandler *sh = SignalHandler::instance();
189  EventHandler *old_eh = sh->register_handler(SIGALRM, new AlarmHandler(stream));
190  delete old_eh;
191  alarm(d_timeout);
192  }
193 #endif
194 }
195 
203 void
204 ResponseBuilder::split_ce(ConstraintEvaluator &eval, const string &expr)
205 {
206  string ce;
207  if (!expr.empty())
208  ce = expr;
209  else
210  ce = d_ce;
211 
212  string btp_function_ce = "";
213  string::size_type pos = 0;
214  DBG(cerr << "ce: " << ce << endl);
215 
216  string::size_type first_paren = ce.find("(", pos);
217  string::size_type closing_paren = ce.find(")", pos);
218  while (first_paren != string::npos && closing_paren != string::npos) {
219  // Maybe a BTP function; get the name of the potential function
220  string name = ce.substr(pos, first_paren-pos);
221  DBG(cerr << "name: " << name << endl);
222  // is this a BTP function
223  btp_func f;
224  if (eval.find_function(name, &f)) {
225  // Found a BTP function
226  if (!btp_function_ce.empty())
227  btp_function_ce += ",";
228  btp_function_ce += ce.substr(pos, closing_paren+1-pos);
229  ce.erase(pos, closing_paren+1-pos);
230  if (ce[pos] == ',')
231  ce.erase(pos, 1);
232  }
233  else {
234  pos = closing_paren + 1;
235  // exception?
236  if (pos < ce.length() && ce.at(pos) == ',')
237  ++pos;
238  }
239 
240  first_paren = ce.find("(", pos);
241  closing_paren = ce.find(")", pos);
242  }
243 
244  DBG(cerr << "Modified constraint: " << ce << endl);
245  DBG(cerr << "BTP Function part: " << btp_function_ce << endl);
246 
247  d_ce = ce;
248  d_btp_func_ce = btp_function_ce;
249 }
250 
258 static string
259 build_cache_file_name(const string &dataset, const string &ce)
260 {
261  DBG(cerr << "build_cache_file_name: dataset: " << dataset << ", ce: " << ce << endl);
262 
263  string name = dataset + "#" + ce;
264  string::size_type pos = name.find_first_of("/(),\"\'");
265  while (pos != string::npos) {
266  name.replace(pos, 1, "#", 1);
267  pos = name.find_first_of("/()\"\'");
268  }
269 
270  DBG(cerr << "build_cache_file_name: name: " << name << endl);
271 
272  return name;
273 }
274 
275 #if 0
276 static bool cached_data_ddx_exists(const string &cache_file_name)
277 {
278  ifstream icache_file(cache_file_name.c_str()); // closes on return
279 
280  return !icache_file.fail() && !icache_file.bad() && !icache_file.eof();
281 }
282 #endif
283 
292 bool ResponseBuilder::is_valid(const string &cache_file_name)
293 {
294  // If the cached response is zero bytes in size, it's not valid.
295  // (hmmm...)
296 
297  off_t entry_size = 0;
298  time_t entry_time = 0;
299  struct stat buf;
300  if (stat(cache_file_name.c_str(), &buf) == 0) {
301  entry_size = buf.st_size;
302  entry_time = buf.st_mtime;
303  }
304  else {
305  return false;
306  }
307 
308  if (entry_size == 0)
309  return false;
310 
311  time_t dataset_time = entry_time;
312  if (stat(d_dataset.c_str(), &buf) == 0) {
313  dataset_time = buf.st_mtime;
314  }
315 
316  // Trick: if the d_dataset is not a file, stat() returns error and
317  // the times stay equal and the code uses the cache entry.
318 
319  // TODO Fix this so that the code can get a LMT from the correct
320  // handler.
321  if (dataset_time > entry_time)
322  return false;
323 
324  return true;
325 }
326 
340 DDS *ResponseBuilder::read_cached_dataset(DDS &dds, ConstraintEvaluator &eval,
341  string &cache_token)
342 {
343  DBG(cerr << "Found function(s) in CE: " << d_btp_func_ce << endl);
344 
345  // These are used for the cached or newly created DDS object
346  BaseTypeFactory factory;
347  DDS *fdds;
348 
349  // Get the cache filename for this thing. Do not use the default
350  // name mangling; instead use what build_cache_file_name() does.
351  string cache_file_name = d_cache->get_cache_file_name(build_cache_file_name(d_dataset, d_btp_func_ce), false);
352  int fd;
353  try {
354  // If the object in the cache is not valid, remove it. The read_lock will
355  // then fail and the code will drop down to the create_and_lock() call.
356  // is_valid() tests for a non-zero object and for d_dateset newer than
357  // the cached object.
358  if (!is_valid(cache_file_name))
359  d_cache->purge_file(cache_file_name);
360 
361  if (d_cache->get_read_lock(cache_file_name, fd)) {
362  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
363  fdds = get_cached_data_ddx(cache_file_name, &factory);
364  }
365 
366  // If here, the cache_file_name could not be locked for read access;
367  // try to build it. First make an empty file and get an exclusive lock on it.
368  // TODO Make this an 'else if'?
369  if (d_cache->create_and_lock(cache_file_name, fd)) {
370  DBG(cerr << "function ce - caching " << cache_file_name << endl );
371 
372  eval.parse_constraint(d_btp_func_ce, dds);
373  fdds = eval.eval_function_clauses(dds);
374 
375  // TODO cache it using fd. Since this is advisory locking, this will work...
376  // Improve?
377  cache_data_ddx(cache_file_name, *fdds);
378 
379  // Change the exclusive lock on the new file to a shared lock. This keeps
380  // other processes from purging the new file and ensures that the reading
381  // process can use it.
382  d_cache->exclusive_to_shared_lock(fd);
383 
384  // Now update the total cache size info and purge if needed. The new file's
385  // name is passed into the purge method because this process cannot detect its
386  // own lock on the file.
387  unsigned long long size = d_cache->update_cache_info(cache_file_name);
388  if (d_cache->cache_too_big(size))
389  d_cache->update_and_purge(cache_file_name);
390  }
391  // get_read_lock() returns immediately if the file does not exist,
392  // but blocks waiting to get a shared lock if the file does exist.
393  else if (d_cache->get_read_lock(cache_file_name, fd)) {
394  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
395  fdds = get_cached_data_ddx(cache_file_name, &factory);
396  }
397  else {
398  throw InternalErr(__FILE__, __LINE__, "Cache error during function invocation.");
399  }
400  }
401  catch (...) {
402  DBG(cerr << "caught exception, unlocking cache and re-throw." << endl );
403  // I think this call is not needed. jhrg 10/23/12
404  d_cache->unlock_cache();
405  throw;
406  }
407 
408  cache_token = cache_file_name; // Set this value-result parameter
409  return fdds;
410 }
411 
422 void ResponseBuilder::send_das(ostream &out, DAS &das, bool with_mime_headers)
423 {
424  if (with_mime_headers)
425  set_mime_text(out, dods_das, x_plain, last_modified_time(d_dataset), "2.0");
426 
427  das.print(out);
428 
429  out << flush;
430 }
431 
447 void ResponseBuilder::send_das(ostream &out, DDS &dds, ConstraintEvaluator &eval, bool constrained,
448  bool with_mime_headers)
449 {
450  // Set up the alarm.
451  establish_timeout(out);
452  dds.set_timeout(d_timeout);
453 
454  if (!constrained) {
455  if (with_mime_headers)
456  set_mime_text(out, dods_das, x_plain, last_modified_time(d_dataset), "2.0");
457 
458  dds.print_das(out);
459  out << flush;
460 
461  return;
462  }
463 
464  split_ce(eval);
465 
466  // If there are functions, parse them and eval.
467  // Use that DDS and parse the non-function ce
468  // Serialize using the second ce and the second dds
469  if (!d_btp_func_ce.empty()) {
470 #if 0
471  DBG(cerr << "Found function(s) in CE: " << d_btp_func_ce << endl);
472 
473  // These are used for the cached or newly created DDS object
474  BaseTypeFactory factory;
475  DDS *fdds;
476 
477  // Get the cache filename for this thing. Do not use the default
478  // name mangling; instead use what build_cache_file_name() does.
479  string cache_file_name = d_cache->get_cache_file_name(build_cache_file_name(d_dataset, d_btp_func_ce), false);
480  int fd;
481  try {
482  // If the object in the cache is not valid, remove it. The read_lock will
483  // then fail and the code will drop down to the create_and_lock() call.
484  // is_valid() tests for a non-zero object and for d_dateset newer than
485  // the cached object.
486  if (!is_valid(cache_file_name))
487  d_cache->purge_file(cache_file_name);
488 
489  if (d_cache->get_read_lock(cache_file_name, fd)) {
490  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
491  fdds = get_cached_data_ddx(cache_file_name, &factory);
492  }
493 
494  // If here, the cache_file_name could not be locked for read access;
495  // try to build it. First make an empty file and get an exclusive lock on it.
496  // TODO Make this an 'else if'?
497  if (d_cache->create_and_lock(cache_file_name, fd)) {
498  DBG(cerr << "function ce - caching " << cache_file_name << endl );
499 
500  eval.parse_constraint(d_btp_func_ce, dds);
501  fdds = eval.eval_function_clauses(dds);
502 
503  // TODO cache it using fd. Since this is advisory locking, this will work...
504  // Improve?
505  cache_data_ddx(cache_file_name, *fdds);
506 
507  // Change the exclusive lock on the new file to a shared lock. This keeps
508  // other processes from purging the new file and ensures that the reading
509  // process can use it.
510  d_cache->exclusive_to_shared_lock(fd);
511 
512  // Now update the total cache size info and purge if needed. The new file's
513  // name is passed into the purge method because this process cannot detect its
514  // own lock on the file.
515  unsigned long long size = d_cache->update_cache_info(cache_file_name);
516  if (d_cache->cache_too_big(size))
517  d_cache->update_and_purge(cache_file_name);
518  }
519  else if (d_cache->get_read_lock(cache_file_name, fd)) {
520  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
521  fdds = get_cached_data_ddx(cache_file_name, &factory);
522  }
523  else {
524  throw InternalErr(__FILE__, __LINE__, "Cache error during function invocation.");
525  }
526  }
527  catch (...) {
528  DBG(cerr << "caught exception, unlocking cache and re-throw." << endl );
529  // I think this call is not needed. jhrg 10/23/12
530  d_cache->unlock_cache();
531  throw;
532  }
533 #endif
534  DDS *fdds = 0;
535  string cache_token = "";
536 
537  if (d_cache) {
538  DBG(cerr << "Using the cache for the server function CE" << endl);
539  fdds = read_cached_dataset(dds, eval, cache_token);
540  }
541  else {
542  DBG(cerr << "Cache not found; (re)calculating" << endl);
543  eval.parse_constraint(d_btp_func_ce, dds);
544  fdds = eval.eval_function_clauses(dds);
545  }
546 
547  if (with_mime_headers)
549 
550  fdds->print_das(out);
551 
552  if (d_cache)
553  d_cache->unlock_and_close(cache_token);
554 
555  delete fdds;
556  }
557  else {
558  DBG(cerr << "Simple constraint" << endl);
559 
560  eval.parse_constraint(d_ce, dds); // Throws Error if the ce doesn't parse.
561 
562  if (with_mime_headers)
564 
565  dds.print_das(out);
566  }
567 
568  out << flush;
569 }
570 
588 void ResponseBuilder::send_dds(ostream &out, DDS &dds, ConstraintEvaluator &eval, bool constrained,
589  bool with_mime_headers)
590 {
591  if (!constrained) {
592  if (with_mime_headers)
594 
595  dds.print(out);
596  out << flush;
597  return;
598  }
599 
600  // Set up the alarm.
601  establish_timeout(out);
602  dds.set_timeout(d_timeout);
603 
604  // Split constraint into two halves
605  split_ce(eval);
606 
607  // If there are functions, parse them and eval.
608  // Use that DDS and parse the non-function ce
609  // Serialize using the second ce and the second dds
610  if (!d_btp_func_ce.empty()) {
611 #if 0
612  DBG(cerr << "Found function(s) in CE: " << d_btp_func_ce << endl);
613 
614  // These are used for the cached or newly created DDS object
615  BaseTypeFactory factory;
616  DDS *fdds;
617 
618  // Get the cache filename for this thing. Do not use the default
619  // name mangling; instead use what build_cache_file_name() does.
620  string cache_file_name = d_cache->get_cache_file_name(build_cache_file_name(d_dataset, d_btp_func_ce), false);
621  int fd;
622  try {
623  // If the object in the cache is not valid, remove it. The read_lock will
624  // then fail and the code will drop down to the create_and_lock() call.
625  // is_valid() tests for a non-zero object and for d_dateset newer than
626  // the cached object.
627  if (!is_valid(cache_file_name))
628  d_cache->purge_file(cache_file_name);
629 
630  if (d_cache->get_read_lock(cache_file_name, fd)) {
631  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
632  fdds = get_cached_data_ddx(cache_file_name, &factory);
633  }
634 
635  // If here, the cache_file_name could not be locked for read access;
636  // try to build it. First make an empty file and get an exclusive lock on it.
637  if (d_cache->create_and_lock(cache_file_name, fd)) {
638  DBG(cerr << "function ce - caching " << cache_file_name << endl );
639 
640  eval.parse_constraint(d_btp_func_ce, dds);
641  fdds = eval.eval_function_clauses(dds);
642 
643  // TODO cache it using fd. Since this is advisory locking, this will work...
644  // Improve?
645  cache_data_ddx(cache_file_name, *fdds);
646 
647  // Change the exclusive lock on the new file to a shared lock. This keeps
648  // other processes from purging the new file and ensures that the reading
649  // process can use it.
650  d_cache->exclusive_to_shared_lock(fd);
651 
652  // Now update the total cache size info and purge if needed. The new file's
653  // name is passed into the purge method because this process cannot detect its
654  // own lock on the file.
655  unsigned long long size = d_cache->update_cache_info(cache_file_name);
656  if (d_cache->cache_too_big(size))
657  d_cache->update_and_purge(cache_file_name);
658  }
659  else if (d_cache->get_read_lock(cache_file_name, fd)) {
660  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
661  fdds = get_cached_data_ddx(cache_file_name, &factory);
662  }
663  else {
664  throw InternalErr(__FILE__, __LINE__, "Cache error during function invocation.");
665  }
666  }
667  catch (...) {
668  DBG(cerr << "caught exception, unlocking cache and re-throw." << endl );
669  // I think this call is not needed. jhrg 10/23/12
670  d_cache->unlock_cache();
671  throw;
672  }
673 #endif
674  string cache_token = "";
675  DDS *fdds = 0;
676 
677  if (d_cache) {
678  DBG(cerr << "Using the cache for the server function CE" << endl);
679  fdds = read_cached_dataset(dds, eval, cache_token);
680  }
681  else {
682  DBG(cerr << "Cache not found; (re)calculating" << endl);
683  eval.parse_constraint(d_btp_func_ce, dds);
684  fdds = eval.eval_function_clauses(dds);
685  }
686 
687  // Server functions might mark variables to use their read()
688  // methods. Clear that so the CE in d_ce will control what is
689  // sent. If that is empty (there was only a function call) all
690  // of the variables in the intermediate DDS (i.e., the function
691  // result) will be sent.
692  fdds->mark_all(false);
693 
694  eval.parse_constraint(d_ce, *fdds);
695 
696  if (with_mime_headers)
698 
699  fdds->print_constrained(out);
700 
701  if (d_cache)
702  d_cache->unlock_and_close(cache_token);
703 
704  delete fdds;
705  }
706  else {
707  DBG(cerr << "Simple constraint" << endl);
708 
709  eval.parse_constraint(d_ce, dds); // Throws Error if the ce doesn't parse.
710 
711  if (with_mime_headers)
713 
714  dds.print_constrained(out);
715  }
716 
717  out << flush;
718 }
719 
720 void ResponseBuilder::dataset_constraint(ostream &out, DDS & dds, ConstraintEvaluator & eval, bool ce_eval) const {
721  // send constrained DDS
722  DBG(cerr << "Inside dataset_constraint" << endl);
723  dds.print_constrained(out);
724  out << "Data:\n";
725  out << flush;
726 #ifdef CHECKSUMS
727  // Grab a stream that encodes using XDR.
728  XDRStreamMarshaller m(out, true, true);
729 #else
730  XDRStreamMarshaller m(out, false, true);
731 #endif
732  DBG(cerr << "Built stream encoder" << endl);
733  try {
734  // Send all variables in the current projection (send_p())
735  for (DDS::Vars_iter i = dds.var_begin(); i != dds.var_end(); i++)
736  if ((*i)->send_p()) {
737  DBG(cerr << "Sending " << (*i)->name() << endl);
738 #ifdef CHECKSUMS
739  if ((*i)->type() != dods_structure_c && (*i)->type() != dods_grid_c)
740  m.reset_checksum();
741 
742  (*i)->serialize(eval, dds, m, ce_eval);
743 
744  if ((*i)->type() != dods_structure_c && (*i)->type() != dods_grid_c)
745  cerr << (*i)->name() << ": " << m.get_checksum() << endl;
746 #else
747  (*i)->serialize(eval, dds, m, ce_eval);
748 #endif
749  }
750  }
751  catch (Error & e) {
752  throw;
753  }
754 }
755 
756 void ResponseBuilder::dataset_constraint_ddx(ostream &out, DDS & dds, ConstraintEvaluator & eval, const string &boundary, const string &start, bool ce_eval) const
757 {
758  // Write the MPM headers for the DDX (text/xml) part of the response
759  set_mime_ddx_boundary(out, boundary, start, dap4_ddx);
760 
761  // Make cid
762  uuid_t uu;
763  uuid_generate(uu);
764  char uuid[37];
765  uuid_unparse(uu, &uuid[0]);
766  char domain[256];
767  if (getdomainname(domain, 255) != 0 || strlen(domain) == 0)
768  strncpy(domain, "opendap.org", 255);
769 
770  string cid = string(&uuid[0]) + "@" + string(&domain[0]);
771 
772  // Send constrained DDX with a data blob reference
773  dds.print_xml(out, true, cid);
774 
775  // Write the MPM headers for the data part of the response.
776  set_mime_data_boundary(out, boundary, cid, dap4_data, binary);
777 
778  // Grab a stream that encodes using XDR.
779  XDRStreamMarshaller m(out);
780 
781  // TODO Remove useless try/catch
782  try {
783  // Send all variables in the current projection (send_p())
784  for (DDS::Vars_iter i = dds.var_begin(); i != dds.var_end(); i++) {
785  if ((*i)->send_p()) {
786  DBG(cerr << "Sending " << (*i)->name() << endl);
787  (*i)->serialize(eval, dds, m, ce_eval);
788  }
789  }
790  }
791  catch (Error & e) {
792  throw;
793  }
794 }
795 
812 void ResponseBuilder::send_data(ostream & data_stream, DDS & dds, ConstraintEvaluator & eval, bool with_mime_headers)
813 {
814  // Set up the alarm.
815  establish_timeout(data_stream);
816  dds.set_timeout(d_timeout);
817 
818 #if 0
819  eval.parse_constraint(d_ce, dds); // Throws Error if the ce doesn't parse.
820 
821  dds.tag_nested_sequences(); // Tag Sequences as Parent or Leaf node.
822 
823  if (dds.get_response_limit() != 0 && dds.get_request_size(true) > dds.get_response_limit()) {
824  string msg = "The Request for " + long_to_string(dds.get_request_size(true) / 1024)
825  + "KB is too large; requests for this user are limited to "
826  + long_to_string(dds.get_response_limit() / 1024) + "KB.";
827  throw Error(msg);
828  }
829 #endif
830 
831  // Split constraint into two halves
832  split_ce(eval);
833 
834  // If there are functions, parse them and eval.
835  // Use that DDS and parse the non-function ce
836  // Serialize using the second ce and the second dds
837  if (!d_btp_func_ce.empty()) {
838  DBG(cerr << "Found function(s) in CE: " << d_btp_func_ce << endl);
839 #if 0
840  // These are used for the cached or newly created DDS object
841  BaseTypeFactory factory;
842  DDS *fdds;
843 
844  // Get the cache filename for this thing. Do not use the default
845  // name mangling; instead use what build_cache_file_name() does.
846  string cache_file_name = d_cache->get_cache_file_name(build_cache_file_name(d_dataset, d_btp_func_ce), false);
847  int fd;
848  try {
849  // If the object in the cache is not valid, remove it. The read_lock will
850  // then fail and the code will drop down to the create_and_lock() call.
851  // is_valid() tests for a non-zero object and for d_dateset newer than
852  // the cached object.
853  if (!is_valid(cache_file_name))
854  d_cache->purge_file(cache_file_name);
855 
856  if (d_cache->get_read_lock(cache_file_name, fd)) {
857  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
858  fdds = get_cached_data_ddx(cache_file_name, &factory);
859  }
860 
861  // If here, the cache_file_name could not be locked for read access;
862  // try to build it. First make an empty file and get an exclusive lock on it.
863  if (d_cache->create_and_lock(cache_file_name, fd)) {
864  DBG(cerr << "function ce - caching " << cache_file_name << endl );
865 
866  eval.parse_constraint(d_btp_func_ce, dds);
867  fdds = eval.eval_function_clauses(dds);
868 
869  // TODO cache it using fd. Since this is advisory locking, this will work...
870  // Improve?
871  // Until Connect/Response support working with file descriptors, it's
872  // better to use the names.
873  cache_data_ddx(cache_file_name, *fdds);
874 
875  // Change the exclusive lock on the new file to a shared lock. This keeps
876  // other processes from purging the new file and ensures that the reading
877  // process can use it.
878  d_cache->exclusive_to_shared_lock(fd);
879 
880  // Now update the total cache size info and purge if needed. The new file's
881  // name is passed into the purge method because this process cannot detect its
882  // own lock on the file.
883  unsigned long long size = d_cache->update_cache_info(cache_file_name);
884  if (d_cache->cache_too_big(size))
885  d_cache->update_and_purge(cache_file_name);
886  }
887  else if (d_cache->get_read_lock(cache_file_name, fd)) {
888  DBG(cerr << "function ce - cached hit: " << cache_file_name << endl );
889  fdds = get_cached_data_ddx(cache_file_name, &factory);
890  }
891  else {
892  throw InternalErr(__FILE__, __LINE__, "Cache error during function invocation.");
893  }
894  }
895  catch (...) {
896  DBG(cerr << "caught exception, unlocking cache and re-throw." << endl );
897  // I think this call is not needed. jhrg 10/23/12
898  d_cache->unlock_cache();
899  throw;
900  }
901 #endif
902 #if 0
903  // ******** original code here ***********
904 
905  // Check to see if the cached data ddx exists and is valid
906  if (cached_data_ddx_exists(cache_file_name)) {
907  fdds = get_cached_data_ddx(cache_file_name, &factory);
908 #if 0
909  // Use the cache file and don't eval the function(s)
910  DBG(cerr << "Reading cache for " << d_dataset + "?" + d_btp_func_ce << endl);
911  icache_file.close(); // only opened to see if it's there; Connect/Response do their own thing
912 
913  fdds = new DDS(&factory);
914  fdds->set_dap_version("4.0"); // TODO note about cid, ...
915  // FIXME name should be...
916  fdds->filename( d_dataset ) ;
917  fdds->set_dataset_name( name_path( d_dataset ) ) ;
918 
919  Connect *url = new Connect( d_dataset ) ;
920  Response *r = new Response( fopen( cache_file_name.c_str(), "r" ), 0 ) ;
921  if( !r->get_stream() )
922  throw Error("The input source: " + cache_file_name + " could not be opened");
923 
924  url->read_data( *fdds, r ) ;
925  fdds->set_factory( 0 ) ;
926 
927  // mark everything as read.
928  DDS::Vars_iter i = fdds->var_begin() ;
929  DDS::Vars_iter e = fdds->var_end() ;
930  for( ; i != e; i++ ) {
931  BaseType *b = (*i) ;
932  b->set_read_p( true ) ;
933  }
934  // for_each(dds->var_begin(), dds->var_end(), mfunc(BaseType::set_read_p));
935 
936  DAS *das = new DAS ;
937  Ancillary::read_ancillary_das( *das, d_dataset ) ;
938  fdds->transfer_attributes( das ) ;
939 #endif
940  }
941  else {
942  eval.parse_constraint(d_btp_func_ce, dds);
943  fdds = eval.eval_function_clauses(dds);
944 
945  cache_data_ddx(cache_file_name, *fdds);
946 #if 0
947  // TODO cache the fdds here
948  ofstream ocache_file(cache_file_name.c_str());
949 
950  DBG(cerr << "Caching " << d_dataset + "?" + d_btp_func_ce << endl);
951  cache_data_ddx(ocache_file, *fdds);
952  ocache_file.close();
953 #endif
954  }
955 #endif
956  string cache_token = "";
957  DDS *fdds = 0;
958 
959  if (d_cache) {
960  DBG(cerr << "Using the cache for the server function CE" << endl);
961  fdds = read_cached_dataset(dds, eval, cache_token);
962  }
963  else {
964  DBG(cerr << "Cache not found; (re)calculating" << endl);
965  eval.parse_constraint(d_btp_func_ce, dds);
966  fdds = eval.eval_function_clauses(dds);
967  }
968 
969  DBG(cerr << "Intermediate DDS: " << endl);
970  DBG(fdds->print_constrained(cerr));
971 
972  DBG(cerr << "Parsing remaining CE: " << d_ce << endl);
973 
974  // Server functions might mark variables to use their read()
975  // methods. Clear that so the CE in d_ce will control what is
976  // sent. If that is empty (there was only a function call) all
977  // of the variables in the intermediate DDS (i.e., the function
978  // result) will be sent.
979  fdds->mark_all(false);
980 
981  eval.parse_constraint(d_ce, *fdds);
982 
983  fdds->tag_nested_sequences(); // Tag Sequences as Parent or Leaf node.
984 
985  if (fdds->get_response_limit() != 0 && fdds->get_request_size(true) > fdds->get_response_limit()) {
986  string msg = "The Request for " + long_to_string(dds.get_request_size(true) / 1024)
987  + "KB is too large; requests for this user are limited to "
988  + long_to_string(dds.get_response_limit() / 1024) + "KB.";
989  throw Error(msg);
990  }
991 
992  if (with_mime_headers)
993  set_mime_binary(data_stream, dods_data, x_plain, last_modified_time(d_dataset), dds.get_dap_version());
994 
995  DBG(cerr << "About to call dataset_constraint" << endl);
996  dataset_constraint(data_stream, *fdds, eval, false);
997 
998  if (d_cache)
999  d_cache->unlock_and_close(cache_token);
1000 
1001  delete fdds;
1002  }
1003  else {
1004  DBG(cerr << "Simple constraint" << endl);
1005 
1006  eval.parse_constraint(d_ce, dds); // Throws Error if the ce doesn't parse.
1007 
1008  dds.tag_nested_sequences(); // Tag Sequences as Parent or Leaf node.
1009 
1010  if (dds.get_response_limit() != 0 && dds.get_request_size(true) > dds.get_response_limit()) {
1011  string msg = "The Request for " + long_to_string(dds.get_request_size(true) / 1024)
1012  + "KB is too large; requests for this user are limited to "
1013  + long_to_string(dds.get_response_limit() / 1024) + "KB.";
1014  throw Error(msg);
1015  }
1016 
1017  if (with_mime_headers)
1018  set_mime_binary(data_stream, dods_data, x_plain, last_modified_time(d_dataset), dds.get_dap_version());
1019 
1020  dataset_constraint(data_stream, dds, eval);
1021  }
1022 
1023 #if 0
1024  // Start sending the response...
1025 
1026  // Handle *functional* constraint expressions specially
1027  if (eval.function_clauses()) {
1028  DDS *fdds = eval.eval_function_clauses(dds);
1029  if (with_mime_headers)
1030  set_mime_binary(data_stream, dods_data, x_plain, last_modified_time(d_dataset), dds.get_dap_version());
1031 
1032  dataset_constraint(data_stream, *fdds, eval, false);
1033  delete fdds;
1034  }
1035  else {
1036  if (with_mime_headers)
1037  set_mime_binary(data_stream, dods_data, x_plain, last_modified_time(d_dataset), dds.get_dap_version());
1038 
1039  dataset_constraint(data_stream, dds, eval);
1040  }
1041 #endif
1042 
1043  data_stream << flush;
1044 }
1045 
1060 void ResponseBuilder::send_ddx(ostream &out, DDS &dds, ConstraintEvaluator &eval, bool with_mime_headers)
1061 {
1062  // If constrained, parse the constraint. Throws Error or InternalErr.
1063  if (!d_ce.empty())
1064  eval.parse_constraint(d_ce, dds);
1065 
1066  if (eval.functional_expression())
1067  throw Error("Function calls can only be used with data requests. To see the structure of the underlying data source, reissue the URL without the function.");
1068 
1069  if (with_mime_headers)
1071 
1072  dds.print_xml_writer(out, !d_ce.empty(), "");
1073 }
1074 
1095 void ResponseBuilder::send_data_ddx(ostream & data_stream, DDS & dds, ConstraintEvaluator & eval, const string &start, const string &boundary, bool with_mime_headers)
1096 {
1097 
1098  // Set up the alarm.
1099  establish_timeout(data_stream);
1100  dds.set_timeout(d_timeout);
1101 
1102  eval.parse_constraint(d_ce, dds); // Throws Error if the ce doesn't parse.
1103 
1104  if (dds.get_response_limit() != 0 && dds.get_request_size(true) > dds.get_response_limit()) {
1105  string msg = "The Request for " + long_to_string(dds.get_request_size(true) / 1024) + "KB is too large; requests for this user are limited to " + long_to_string(
1106  dds.get_response_limit() / 1024) + "KB.";
1107  throw Error(msg);
1108  }
1109 
1110  dds.tag_nested_sequences(); // Tag Sequences as Parent or Leaf node.
1111 
1112  // Start sending the response...
1113 
1114  // Handle *functional* constraint expressions specially
1115  if (eval.function_clauses()) {
1116  DDS *fdds = eval.eval_function_clauses(dds);
1117  if (with_mime_headers)
1118  set_mime_multipart(data_stream, boundary, start, dap4_data_ddx, x_plain, last_modified_time(d_dataset));
1119  data_stream << flush;
1120  // TODO: Change this to dataset_constraint_ddx()
1121  dataset_constraint(data_stream, *fdds, eval, false);
1122  delete fdds;
1123  }
1124  else {
1125  if (with_mime_headers)
1126  set_mime_multipart(data_stream, boundary, start, dap4_data_ddx, x_plain, last_modified_time(d_dataset));
1127  data_stream << flush;
1128  dataset_constraint_ddx(data_stream, dds, eval, boundary, start);
1129  }
1130 
1131  data_stream << flush;
1132 
1133  if (with_mime_headers)
1134  data_stream << CRLF << "--" << boundary << "--" << CRLF;
1135 }
1136 
1149 void ResponseBuilder::cache_data_ddx(const string &cache_file_name, DDS &dds)
1150 {
1151  DBG(cerr << "Caching " << d_dataset + "?" + d_btp_func_ce << endl);
1152 
1153  ofstream data_stream(cache_file_name.c_str());
1154  // Test for a valid file open
1155 
1156  string start="dataddx_cache_start", boundary="dataddx_cache_boundary";
1157 #if 1
1158  // Does this really need the full set of MIME headers? Not including these
1159  // might make it comparible with the dapreader module in the BES.
1160  set_mime_multipart(data_stream, boundary, start, dap4_data_ddx, x_plain, last_modified_time(d_dataset));
1161  data_stream << flush;
1162 #endif
1163 
1164  // dataset_constraint_ddx() needs a ConstraintEvaluator because
1165  // it calls serialize().
1166  ConstraintEvaluator eval;
1167 
1168  // Setting the DDS version to 3.2 causes the print_xml() code
1169  // to write out a 'blob' element with a valid cid. The reader
1170  // code in Connect needs this (or thinks it does...)
1171  dds.set_dap_version("3.2");
1172 
1173  dataset_constraint_ddx(data_stream, dds, eval, boundary, start);
1174  data_stream << flush;
1175 
1176  data_stream << CRLF << "--" << boundary << "--" << CRLF;
1177  data_stream.close();
1178 }
1179 
1198 void ResponseBuilder::read_data_from_cache(FILE *data, DDS *fdds)
1199 {
1200  // Rip off the MIME headers from the response if they are present
1201  string mime = get_next_mime_header(data);
1202  while (!mime.empty()) {
1203 #if 0
1204  string header, value;
1205  parse_mime_header(mime, header, value);
1206 #endif
1207  mime = get_next_mime_header(data);
1208  }
1209 
1210  // Parse the DDX; throw an exception on error.
1211  DDXParser ddx_parser(fdds->get_factory());
1212 
1213  // Read the MPM boundary and then read the subsequent headers
1214  string boundary = read_multipart_boundary(data);
1215  DBG(cerr << "MPM Boundary: " << boundary << endl);
1216 
1217  read_multipart_headers(data, "text/xml", dap4_ddx);
1218 
1219  // Parse the DDX, reading up to and including the next boundary.
1220  // Return the CID for the matching data part
1221  string data_cid;
1222  ddx_parser.intern_stream(data, fdds, data_cid, boundary);
1223 
1224  // Munge the CID into something we can work with
1225  data_cid = cid_to_header_value(data_cid);
1226  DBG(cerr << "Data CID: " << data_cid << endl);
1227 
1228  // Read the data part's MPM part headers (boundary was read by
1229  // DDXParse::intern)
1230  read_multipart_headers(data, "application/octet-stream", dap4_data, data_cid);
1231 
1232  // Now read the data
1233 
1234  XDRFileUnMarshaller um(data);
1235  for (DDS::Vars_iter i = fdds->var_begin(); i != fdds->var_end(); i++) {
1236  (*i)->deserialize(um, fdds);
1237  }
1238 }
1239 
1243 DDS *
1244 ResponseBuilder::get_cached_data_ddx(const string &cache_file_name, BaseTypeFactory *factory)
1245 {
1246  DBG(cerr << "Reading cache for " << d_dataset + "?" + d_btp_func_ce << endl);
1247 
1248  DDS *fdds = new DDS(factory);
1249 
1250  fdds->filename( d_dataset ) ;
1251  fdds->set_dataset_name( "function_result_" + name_path( d_dataset ) ) ;
1252 
1253 #if 0
1254  Connect *url = new Connect( d_dataset ) ;
1255  Response *r = new Response( fopen( cache_file_name.c_str(), "r" ), 0 ) ;
1256  if( !r->get_stream() )
1257  throw Error("The input source: " + cache_file_name + " could not be opened");
1258 
1259  url->read_data( *fdds, r ) ;
1260 #endif
1261 
1262  // fstream data(cache_file_name.c_str());
1263  FILE *data = fopen( cache_file_name.c_str(), "r" );
1264  read_data_from_cache(data, fdds);
1265  fclose(data);
1266 
1267  fdds->set_factory( 0 ) ;
1268 
1269  // mark everything as read.
1270  DDS::Vars_iter i = fdds->var_begin() ;
1271  DDS::Vars_iter e = fdds->var_end() ;
1272  for( ; i != e; i++ ) {
1273  BaseType *b = (*i) ;
1274  b->set_read_p( true ) ;
1275  }
1276 
1277  // for_each(dds->var_begin(), dds->var_end(), mfunc(BaseType::set_read_p));
1278 
1279 #if 0
1280  // Ancillary attributes were read when the DDX was built and are part of the
1281  // cached BLOB.
1282  DAS *das = new DAS ;
1283  Ancillary::read_ancillary_das( *das, d_dataset ) ;
1284  fdds->transfer_attributes( das ) ;
1285 #endif
1286  return fdds;
1287 }
1288 
1289 static const char *descrip[] = { "unknown", "dods_das", "dods_dds", "dods_data", "dods_error", "web_error", "dap4-ddx", "dap4-data", "dap4-error", "dap4-data-ddx", "dods_ddx" };
1290 
1291 static const char *encoding[] = { "unknown", "deflate", "x-plain", "gzip", "binary" };
1292 
1305 void ResponseBuilder::set_mime_text(ostream &strm, ObjectType type, EncodingType enc, const time_t last_modified, const string &protocol) const
1306 {
1307  strm << "HTTP/1.0 200 OK" << CRLF;
1308 
1309  strm << "XDODS-Server: " << DVR << CRLF;
1310  strm << "XOPeNDAP-Server: " << DVR << CRLF;
1311 
1312  if (protocol == "")
1313  strm << "XDAP: " << d_default_protocol << CRLF;
1314  else
1315  strm << "XDAP: " << protocol << CRLF;
1316 
1317  const time_t t = time(0);
1318  strm << "Date: " << rfc822_date(t).c_str() << CRLF;
1319 
1320  strm << "Last-Modified: ";
1321  if (last_modified > 0)
1322  strm << rfc822_date(last_modified).c_str() << CRLF;
1323  else
1324  strm << rfc822_date(t).c_str() << CRLF;
1325 
1326  if (type == dap4_ddx)
1327  strm << "Content-Type: text/xml" << CRLF;
1328  else
1329  strm << "Content-Type: text/plain" << CRLF;
1330 
1331  // Note that Content-Description is from RFC 2045 (MIME, pt 1), not 2616.
1332  // jhrg 12/23/05
1333  strm << "Content-Description: " << descrip[type] << CRLF;
1334  if (type == dods_error) // don't cache our error responses.
1335  strm << "Cache-Control: no-cache" << CRLF;
1336  // Don't write a Content-Encoding header for x-plain since that breaks
1337  // Netscape on NT. jhrg 3/23/97
1338  if (enc != x_plain)
1339  strm << "Content-Encoding: " << encoding[enc] << CRLF;
1340  strm << CRLF;
1341 }
1342 
1353 void ResponseBuilder::set_mime_html(ostream &strm, ObjectType type, EncodingType enc, const time_t last_modified, const string &protocol) const
1354 {
1355  strm << "HTTP/1.0 200 OK" << CRLF;
1356 
1357  strm << "XDODS-Server: " << DVR << CRLF;
1358  strm << "XOPeNDAP-Server: " << DVR << CRLF;
1359 
1360  if (protocol == "")
1361  strm << "XDAP: " << d_default_protocol << CRLF;
1362  else
1363  strm << "XDAP: " << protocol << CRLF;
1364 
1365  const time_t t = time(0);
1366  strm << "Date: " << rfc822_date(t).c_str() << CRLF;
1367 
1368  strm << "Last-Modified: ";
1369  if (last_modified > 0)
1370  strm << rfc822_date(last_modified).c_str() << CRLF;
1371  else
1372  strm << rfc822_date(t).c_str() << CRLF;
1373 
1374  strm << "Content-type: text/html" << CRLF;
1375  // See note above about Content-Description header. jhrg 12/23/05
1376  strm << "Content-Description: " << descrip[type] << CRLF;
1377  if (type == dods_error) // don't cache our error responses.
1378  strm << "Cache-Control: no-cache" << CRLF;
1379  // Don't write a Content-Encoding header for x-plain since that breaks
1380  // Netscape on NT. jhrg 3/23/97
1381  if (enc != x_plain)
1382  strm << "Content-Encoding: " << encoding[enc] << CRLF;
1383  strm << CRLF;
1384 }
1385 
1399 void ResponseBuilder::set_mime_binary(ostream &strm, ObjectType type, EncodingType enc, const time_t last_modified, const string &protocol) const
1400 {
1401  strm << "HTTP/1.0 200 OK" << CRLF;
1402 
1403  strm << "XDODS-Server: " << DVR << CRLF;
1404  strm << "XOPeNDAP-Server: " << DVR << CRLF;
1405 
1406  if (protocol == "")
1407  strm << "XDAP: " << d_default_protocol << CRLF;
1408  else
1409  strm << "XDAP: " << protocol << CRLF;
1410 
1411  const time_t t = time(0);
1412  strm << "Date: " << rfc822_date(t).c_str() << CRLF;
1413 
1414  strm << "Last-Modified: ";
1415  if (last_modified > 0)
1416  strm << rfc822_date(last_modified).c_str() << CRLF;
1417  else
1418  strm << rfc822_date(t).c_str() << CRLF;
1419 
1420  strm << "Content-Type: application/octet-stream" << CRLF;
1421  strm << "Content-Description: " << descrip[type] << CRLF;
1422  if (enc != x_plain)
1423  strm << "Content-Encoding: " << encoding[enc] << CRLF;
1424 
1425  strm << CRLF;
1426 }
1427 
1428 void ResponseBuilder::set_mime_multipart(ostream &strm, const string &boundary, const string &start, ObjectType type, EncodingType enc, const time_t last_modified, const string &protocol) const
1429 {
1430  strm << "HTTP/1.0 200 OK" << CRLF;
1431 
1432  strm << "XDODS-Server: " << DVR << CRLF;
1433  strm << "XOPeNDAP-Server: " << DVR << CRLF;
1434 
1435  if (protocol == "")
1436  strm << "XDAP: " << d_default_protocol << CRLF;
1437  else
1438  strm << "XDAP: " << protocol << CRLF;
1439 
1440  const time_t t = time(0);
1441  strm << "Date: " << rfc822_date(t).c_str() << CRLF;
1442 
1443  strm << "Last-Modified: ";
1444  if (last_modified > 0)
1445  strm << rfc822_date(last_modified).c_str() << CRLF;
1446  else
1447  strm << rfc822_date(t).c_str() << CRLF;
1448 
1449  strm << "Content-Type: Multipart/Related; boundary=" << boundary << "; start=\"<" << start << ">\"; type=\"Text/xml\"" << CRLF;
1450 
1451  strm << "Content-Description: " << descrip[type] << CRLF;
1452  if (enc != x_plain)
1453  strm << "Content-Encoding: " << encoding[enc] << CRLF;
1454 
1455  strm << CRLF;
1456 }
1457 
1458 void ResponseBuilder::set_mime_ddx_boundary(ostream &strm, const string &boundary, const string &cid, ObjectType type, EncodingType enc) const
1459 {
1460  strm << "--" << boundary << CRLF;
1461  strm << "Content-Type: Text/xml; charset=iso-8859-1" << CRLF;
1462  strm << "Content-Id: <" << cid << ">" << CRLF;
1463  strm << "Content-Description: " << descrip[type] << CRLF;
1464  if (enc != x_plain)
1465  strm << "Content-Encoding: " << encoding[enc] << CRLF;
1466 
1467  strm << CRLF;
1468 }
1469 
1470 void ResponseBuilder::set_mime_data_boundary(ostream &strm, const string &boundary, const string &cid, ObjectType type, EncodingType enc) const
1471 {
1472  strm << "--" << boundary << CRLF;
1473  strm << "Content-Type: application/octet-stream" << CRLF;
1474  strm << "Content-Id: <" << cid << ">" << CRLF;
1475  strm << "Content-Description: " << descrip[type] << CRLF;
1476  if (enc != x_plain)
1477  strm << "Content-Encoding: " << encoding[enc] << CRLF;
1478 
1479  strm << CRLF;
1480 }
1481 
1488 void ResponseBuilder::set_mime_error(ostream &strm, int code, const string &reason, const string &protocol) const
1489 {
1490  strm << "HTTP/1.0 " << code << " " << reason.c_str() << CRLF;
1491 
1492  strm << "XDODS-Server: " << DVR << CRLF;
1493  strm << "XOPeNDAP-Server: " << DVR << CRLF;
1494 
1495  if (protocol == "")
1496  strm << "XDAP: " << d_default_protocol << CRLF;
1497  else
1498  strm << "XDAP: " << protocol << CRLF;
1499 
1500  const time_t t = time(0);
1501  strm << "Date: " << rfc822_date(t).c_str() << CRLF;
1502  strm << "Cache-Control: no-cache" << CRLF;
1503  strm << CRLF;
1504 }
1505 
1506 } // namespace libdap
1507