bes  Updated for version 3.20.6
Chunk.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: Nathan Potter <ndp@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <sstream>
27 // #include <cstdlib>
28 #include <cstring>
29 #include <cassert>
30 
31 #include <zlib.h>
32 
33 #include <BESDebug.h>
34 #include <BESLog.h>
35 #include <BESInternalError.h>
36 #include <BESSyntaxUserError.h>
37 #include <BESContextManager.h>
38 
39 #include "xml2json/include/xml2json.hpp"
40 
42 #include "xml2json/include/rapidjson/writer.h"
43 //#include "xml2json/include/rapidjson/stringbuffer.h"
44 
45 #include "Chunk.h"
46 #include "CurlHandlePool.h"
47 #include "DmrppRequestHandler.h"
48 
49 using namespace std;
50 
51 namespace dmrpp {
52 
53 // This is used to track access to 'cloudydap' accesses in the S3 logs
54 // by adding a query string that will show up in those logs. This is
55 // activated by using a special BES context with the name 'cloudydap.'
56 const std::string Chunk::tracking_context = "cloudydap";
57 
71 size_t chunk_write_data(void *buffer, size_t size, size_t nmemb, void *data)
72 {
73  size_t nbytes = size * nmemb;
74 
75  // Peek into the bytes read and look for an error from the object store.
76  // Error messages always start off with '<?xml' so only check for one if we have more than
77  // four characters in 'buffer.' jhrg 12/17/19
78  if (nbytes > 4) {
79  string peek(reinterpret_cast<const char *>(buffer), 5);
80  if (peek == "<?xml") {
81  // At this point we no longer care about great performance - error msg readability
82  // is more important. jhrg 12/30/19
83  string xml_message = reinterpret_cast<const char *>(buffer);
84  xml_message.erase(xml_message.find_last_not_of("\t\n\v\f\r 0") + 1);
85  // Decode the AWS XML error message. In some cases this will fail because pub keys,
86  // which maybe in this error text, may have < or > chars in them. the XML parser
87  // will be sad if that happens. jhrg 12/30/19
88  try {
89  string json_message = xml2json(xml_message.c_str());
90  BESDEBUG("dmrpp", "AWS S3 Access Error:" << json_message << endl);
91  VERBOSE("AWS S3 Access Error:" << json_message << endl);
92 
94  d.Parse(json_message.c_str());
95  rapidjson::Value& s = d["Error"]["Message"];
96  // We might want to get the "Code" from the "Error" if these text messages
97  // are not good enough. But the "Code" is not really suitable for normal humans...
98  // jhrg 12/31/19
99 
100  throw BESSyntaxUserError(string("Error accessing object store data: ").append(s.GetString()), __FILE__, __LINE__);
101  }
102  catch (BESSyntaxUserError) {
103  // re-throw BESSyntaxUserError - added for the future if we make BESError a child
104  // of std::exception as it should be. jhrg 12/30/19
105  throw;
106  }
107  catch(std::exception &e) {
108  BESDEBUG("dmrpp", "AWS S3 Access Error:" << xml_message << endl);
109  VERBOSE("AWS S3 Access Error:" << xml_message << endl);
110  throw BESSyntaxUserError(string("Error accessing object store data: Unrecognized error, likely an authentication failure."), __FILE__, __LINE__);
111  }
112  }
113  }
114 
115  Chunk *c_ptr = reinterpret_cast<Chunk*>(data);
116 
117  // rbuf: |******++++++++++----------------------|
118  // ^ ^ bytes_read + nbytes
119  // | bytes_read
120 
121  unsigned long long bytes_read = c_ptr->get_bytes_read();
122 
123  // We might be expecting a small response but get an error document instead.
124  // These error responses are generally small (< 4k), so if nbytes is bigger
125  // than the read buffer for this chunk but < 4k, make it larger and move on.
126  // This will aid in error diagnosis. jhrg 11/26/19
127  // TODO Remove this once the above error trapping code works. jhrg 12/17/19
128  if (nbytes <= 4096 && nbytes > c_ptr->get_rbuf_size()) {
129  // set_rbuf() deletes the previous storage; Chunk manages the new memory block
130  c_ptr->set_rbuf(new char[nbytes+2], nbytes+2);
131  }
132 
133  // If this fails, the code will write beyond the buffer.
134  assert(bytes_read + nbytes <= c_ptr->get_rbuf_size());
135 
136  //TODO: Need to setup a unique_ptr to replace the buffer that get_rbuf() returns
137  //unique_ptr<char> new_c_ptr;
138  //new_c_ptr.reset(c_ptr->get_rbuf() + bytes_read);
139  memcpy(c_ptr->get_rbuf() + bytes_read, buffer, nbytes);
140 
141  c_ptr->set_bytes_read(bytes_read + nbytes);
142 
143  return nbytes;
144 }
145 
156 void inflate(char *dest, unsigned int dest_len, char *src, unsigned int src_len)
157 {
158  /* Sanity check */
159  assert(src_len > 0);
160  assert(src);
161  assert(dest_len > 0);
162  assert(dest);
163 
164  /* Input; uncompress */
165  z_stream z_strm; /* zlib parameters */
166 
167  /* Set the uncompression parameters */
168  memset(&z_strm, 0, sizeof(z_strm));
169  z_strm.next_in = (Bytef *) src;
170  z_strm.avail_in = src_len;
171  z_strm.next_out = (Bytef *) dest;
172  z_strm.avail_out = dest_len;
173 
174  /* Initialize the uncompression routines */
175  if (Z_OK != inflateInit(&z_strm))
176  throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
177 
178  /* Loop to uncompress the buffer */
179  int status = Z_OK;
180  do {
181  /* Uncompress some data */
182  status = inflate(&z_strm, Z_SYNC_FLUSH);
183 
184  /* Check if we are done uncompressing data */
185  if (Z_STREAM_END == status) break; /*done*/
186 
187  /* Check for error */
188  if (Z_OK != status) {
189  (void) inflateEnd(&z_strm);
190  throw BESError("Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
191  }
192  else {
193  /* If we're not done and just ran out of buffer space, it's an error.
194  * The HDF5 library code would extend the buffer as needed, but for
195  * this handler, we always know the size of the uncompressed chunk.
196  */
197  if (0 == z_strm.avail_out) {
198  throw BESError("Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
199 #if 0
200  /* Here's how to extend the buffer if needed. This might be useful someday... */
201  void *new_outbuf; /* Pointer to new output buffer */
202 
203  /* Allocate a buffer twice as big */
204  nalloc *= 2;
205  if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
206  (void) inflateEnd(&z_strm);
207  HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0, "memory allocation failed for inflate decompression")
208  } /* end if */
209  outbuf = new_outbuf;
210 
211  /* Update pointers to buffer for next set of uncompressed data */
212  z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
213  z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
214 #endif
215  } /* end if */
216  } /* end else */
217  } while (status == Z_OK);
218 
219  /* Finish uncompressing the stream */
220  (void) inflateEnd(&z_strm);
221 }
222 
223 // #define this to enable the duff's device loop unrolling code.
224 // jhrg 1/19/17
225 #define DUFFS_DEVICE
226 
248 void unshuffle(char *dest, const char *src, unsigned int src_size, unsigned int width)
249 {
250  unsigned int elems = src_size / width; // int division rounds down
251 
252  /* Don't do anything for 1-byte elements, or "fractional" elements */
253  if (!(width > 1 && elems > 1)) {
254  memcpy(dest, const_cast<char*>(src), src_size);
255  }
256  else {
257  /* Get the pointer to the source buffer (Alias for source buffer) */
258  char *_src = const_cast<char*>(src);
259  char *_dest = 0; // Alias for destination buffer
260 
261  /* Input; unshuffle */
262  for (unsigned int i = 0; i < width; i++) {
263  _dest = dest + i;
264 #ifndef DUFFS_DEVICE
265  size_t j = elems;
266  while(j > 0) {
267  *_dest = *_src++;
268  _dest += width;
269 
270  j--;
271  }
272 #else /* DUFFS_DEVICE */
273  {
274  size_t duffs_index = (elems + 7) / 8; /* Counting index for Duff's device */
275  switch (elems % 8) {
276  default:
277  assert(0 && "This Should never be executed!");
278  break;
279  case 0:
280  do {
281  // This macro saves repeating the same line 8 times
282 #define DUFF_GUTS *_dest = *_src++; _dest += width;
283 
284  DUFF_GUTS
285  case 7:
286  DUFF_GUTS
287  case 6:
288  DUFF_GUTS
289  case 5:
290  DUFF_GUTS
291  case 4:
292  DUFF_GUTS
293  case 3:
294  DUFF_GUTS
295  case 2:
296  DUFF_GUTS
297  case 1:
298  DUFF_GUTS
299  } while (--duffs_index > 0);
300  } /* end switch */
301  } /* end block */
302 #endif /* DUFFS_DEVICE */
303 
304  } /* end for i = 0 to width*/
305 
306  /* Compute the leftover bytes if there are any */
307  size_t leftover = src_size % width;
308 
309  /* Add leftover to the end of data */
310  if (leftover > 0) {
311  /* Adjust back to end of shuffled bytes */
312  _dest -= (width - 1); /*lint !e794 _dest is initialized */
313  memcpy((void*) _dest, (void*) _src, leftover);
314  }
315  } /* end if width and elems both > 1 */
316 }
317 
331 void Chunk::set_position_in_array(const string &pia)
332 {
333  if (pia.empty()) return;
334 
335  if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
336 
337  // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
338  // [1] is a minimal 'position in array' string.
339  if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
340  throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
341 
342  if (pia.find_first_not_of("[]1234567890,") != string::npos)
343  throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
344 
345  // string off []; iss holds x,y,...,z
346  istringstream iss(pia.substr(1, pia.length()-2));
347 
348  char c;
349  unsigned int i;
350  while (!iss.eof() ) {
351  iss >> i; // read an integer
352 
353  d_chunk_position_in_array.push_back(i);
354 
355  iss >> c; // read a separator (,)
356  }
357 }
358 
367 void Chunk::set_position_in_array(const std::vector<unsigned int> &pia)
368 {
369  if (pia.size() == 0) return;
370 
371  if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
372 
373  d_chunk_position_in_array = pia;
374 }
375 
383 string Chunk::get_curl_range_arg_string()
384 {
385  ostringstream range; // range-get needs a string arg for the range
386  range << d_offset << "-" << d_offset + d_size - 1;
387  return range.str();
388 }
389 
401 void Chunk::add_tracking_query_param()
402 {
417  string aws_s3_url_https("https://s3.amazonaws.com/");
418  string aws_s3_url_http("http://s3.amazonaws.com/");
419 
420  // Is it an AWS S3 access? (y.find(x) returns 0 when y starts with x)
421  if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
422  // Yup, headed to S3.
423  bool found = false;
424  string cloudydap_context_value = BESContextManager::TheManager()->get_context(tracking_context, found);
425  if (found) {
426  d_query_marker.append("?").append(tracking_context).append("=").append(cloudydap_context_value);
427  }
428  }
429 }
430 
431 #if 0
432 
443 void *inflate_chunk(void *arg_list)
444 {
445  inflate_chunk_args *args = reinterpret_cast<inflate_chunk_args*>(arg_list);
446 
447  try {
448  args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
449  }
450  catch (BESError &error) {
451  delete args;
452  pthread_exit(new BESError(error));
453  }
454 
455  delete args;
456  pthread_exit(NULL);
457 }
458 #endif
459 
460 
472 void Chunk::inflate_chunk(bool deflate, bool shuffle, unsigned int chunk_size, unsigned int elem_width)
473 {
474  // This code is pretty naive - there are apparently a number of
475  // different ways HDF5 can compress data, and it does also use a scheme
476  // where several algorithms can be applied in sequence. For now, get
477  // simple zlib deflate working.jhrg 1/15/17
478  // Added support for shuffle. Assuming unshuffle always is applied _after_
479  // inflating the data (reversing the shuffle --> deflate process). It is
480  // possible that data could just be deflated or shuffled (because we
481  // have test data are use only shuffle). jhrg 1/20/17
482  // The file that implements the deflate filter is H5Zdeflate.c in the hdf5 source.
483  // The file that implements the shuffle filter is H5Zshuffle.c.
484 
485  if (d_is_inflated)
486  return;
487 
488  chunk_size *= elem_width;
489 
490  if (deflate) {
491  char *dest = new char[chunk_size];
492  try {
493  inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
494  // This replaces (and deletes) the original read_buffer with dest.
495  set_rbuf(dest, chunk_size);
496  }
497  catch (...) {
498  delete[] dest;
499  throw;
500  }
501  }
502 
503  if (shuffle) {
504  // The internal buffer is chunk's full size at this point.
505  char *dest = new char[get_rbuf_size()];
506  try {
507  unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
508  set_rbuf(dest, get_rbuf_size());
509  }
510  catch (...) {
511  delete[] dest;
512  throw;
513  }
514  }
515 
516  d_is_inflated = true;
517 
518 #if 0 // This was handy during development for debugging. Keep it for awhile (year or two) before we drop it ndp - 01/18/17
519  if(BESDebug::IsSet("dmrpp")) {
520  unsigned long long chunk_buf_size = get_rbuf_size();
521  dods_float32 *vals = (dods_float32 *) get_rbuf();
522  ostream *os = BESDebug::GetStrm();
523  (*os) << std::fixed << std::setfill('_') << std::setw(10) << std::setprecision(0);
524  (*os) << "DmrppArray::"<< __func__ <<"() - Chunk[" << i << "]: " << endl;
525  for(unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
526  (*os) << vals[k] << ", " << ((k==0)|((k+1)%10)?"":"\n");
527  }
528  }
529 #endif
530 }
531 
541 void Chunk::read_chunk()
542 {
543  if (d_is_read) {
544  BESDEBUG("dmrpp", "Chunk::"<< __func__ <<"() - Already been read! Returning." << endl);
545  return;
546  }
547 
548  set_rbuf_to_size();
549 
550  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
551  if (!handle)
552  throw BESInternalError("No more libcurl handles.", __FILE__, __LINE__);
553 
554  handle->read_data(); // throws BESInternalError if error
555 
556  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
557 
558  // If the expected byte count was not read, it's an error.
559  if (get_size() != get_bytes_read()) {
560  ostringstream oss;
561  oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
562  throw BESInternalError(oss.str(), __FILE__, __LINE__);
563  }
564 
565  d_is_read = true;
566 }
567 
578 void Chunk::dump(ostream &oss) const
579 {
580  oss << "Chunk";
581  oss << "[ptr='" << (void *)this << "']";
582  oss << "[data_url='" << d_data_url << "']";
583  oss << "[offset=" << d_offset << "]";
584  oss << "[size=" << d_size << "]";
585  oss << "[chunk_position_in_array=(";
586  for (unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
587  if (i) oss << ",";
588  oss << d_chunk_position_in_array[i];
589  }
590  oss << ")]";
591  oss << "[is_read=" << d_is_read << "]";
592  oss << "[is_inflated=" << d_is_inflated << "]";
593 }
594 
595 string Chunk::to_string() const
596 {
597  std::ostringstream oss;
598  dump(oss);
599  return oss.str();
600 }
601 
602 } // namespace dmrpp
603 
Document
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.
Definition: cmr_module/rapidjson/document.h:2402
dmrpp::dmrpp_easy_handle
Bundle a libcurl easy handle to other information.
Definition: CurlHandlePool.h:61
dmrpp::dmrpp_easy_handle::read_data
void read_data()
This is the read_data() method for serial transfers.
Definition: CurlHandlePool.cc:313
document.h
BESSyntaxUserError
error thrown if there is a user syntax error in the request or any other user error
Definition: BESSyntaxUserError.h:41
BESDebug::IsSet
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:157
Value
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
Definition: cmr_module/rapidjson/document.h:2010
BESInternalError
exception thrown if internal error encountered
Definition: BESInternalError.h:43
BESDebug::GetStrm
static std::ostream * GetStrm()
return the debug stream
Definition: BESDebug.h:176
BESContextManager::get_context
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Definition: BESContextManager.cc:77
BESError
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58