35 #include <BESInternalError.h>
36 #include <BESSyntaxUserError.h>
37 #include <BESContextManager.h>
39 #include "xml2json/include/xml2json.hpp"
42 #include "xml2json/include/rapidjson/writer.h"
46 #include "CurlHandlePool.h"
47 #include "DmrppRequestHandler.h"
56 const std::string Chunk::tracking_context =
"cloudydap";
71 size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data)
73 size_t nbytes = size * nmemb;
79 string peek(
reinterpret_cast<const char *
>(buffer), 5);
80 if (peek ==
"<?xml") {
83 string xml_message =
reinterpret_cast<const char *
>(buffer);
84 xml_message.erase(xml_message.find_last_not_of(
"\t\n\v\f\r 0") + 1);
89 string json_message = xml2json(xml_message.c_str());
90 BESDEBUG(
"dmrpp",
"AWS S3 Access Error:" << json_message << endl);
91 VERBOSE(
"AWS S3 Access Error:" << json_message << endl);
94 d.Parse(json_message.c_str());
100 throw BESSyntaxUserError(
string(
"Error accessing object store data: ").append(s.GetString()), __FILE__, __LINE__);
107 catch(std::exception &e) {
108 BESDEBUG(
"dmrpp",
"AWS S3 Access Error:" << xml_message << endl);
109 VERBOSE(
"AWS S3 Access Error:" << xml_message << endl);
110 throw BESSyntaxUserError(
string(
"Error accessing object store data: Unrecognized error, likely an authentication failure."), __FILE__, __LINE__);
115 Chunk *c_ptr =
reinterpret_cast<Chunk*
>(data);
121 unsigned long long bytes_read = c_ptr->get_bytes_read();
128 if (nbytes <= 4096 && nbytes > c_ptr->get_rbuf_size()) {
130 c_ptr->set_rbuf(
new char[nbytes+2], nbytes+2);
134 assert(bytes_read + nbytes <= c_ptr->get_rbuf_size());
139 memcpy(c_ptr->get_rbuf() + bytes_read, buffer, nbytes);
141 c_ptr->set_bytes_read(bytes_read + nbytes);
156 void inflate(
char *dest,
unsigned int dest_len,
char *src,
unsigned int src_len)
161 assert(dest_len > 0);
168 memset(&z_strm, 0,
sizeof(z_strm));
169 z_strm.next_in = (Bytef *) src;
170 z_strm.avail_in = src_len;
171 z_strm.next_out = (Bytef *) dest;
172 z_strm.avail_out = dest_len;
175 if (Z_OK != inflateInit(&z_strm))
176 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
182 status = inflate(&z_strm, Z_SYNC_FLUSH);
185 if (Z_STREAM_END == status)
break;
188 if (Z_OK != status) {
189 (void) inflateEnd(&z_strm);
190 throw BESError(
"Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
197 if (0 == z_strm.avail_out) {
198 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
205 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
206 (void) inflateEnd(&z_strm);
207 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
212 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
213 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
217 }
while (status == Z_OK);
220 (void) inflateEnd(&z_strm);
248 void unshuffle(
char *dest,
const char *src,
unsigned int src_size,
unsigned int width)
250 unsigned int elems = src_size / width;
253 if (!(width > 1 && elems > 1)) {
254 memcpy(dest,
const_cast<char*
>(src), src_size);
258 char *_src =
const_cast<char*
>(src);
262 for (
unsigned int i = 0; i < width; i++) {
274 size_t duffs_index = (elems + 7) / 8;
277 assert(0 &&
"This Should never be executed!");
282 #define DUFF_GUTS *_dest = *_src++; _dest += width;
299 }
while (--duffs_index > 0);
307 size_t leftover = src_size % width;
312 _dest -= (width - 1);
313 memcpy((
void*) _dest, (
void*) _src, leftover);
331 void Chunk::set_position_in_array(
const string &pia)
333 if (pia.empty())
return;
335 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
339 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
340 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
342 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
343 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
346 istringstream iss(pia.substr(1, pia.length()-2));
350 while (!iss.eof() ) {
353 d_chunk_position_in_array.push_back(i);
367 void Chunk::set_position_in_array(
const std::vector<unsigned int> &pia)
369 if (pia.size() == 0)
return;
371 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
373 d_chunk_position_in_array = pia;
383 string Chunk::get_curl_range_arg_string()
386 range << d_offset <<
"-" << d_offset + d_size - 1;
401 void Chunk::add_tracking_query_param()
417 string aws_s3_url_https(
"https://s3.amazonaws.com/");
418 string aws_s3_url_http(
"http://s3.amazonaws.com/");
421 if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
424 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(tracking_context, found);
426 d_query_marker.append(
"?").append(tracking_context).append(
"=").append(cloudydap_context_value);
443 void *inflate_chunk(
void *arg_list)
445 inflate_chunk_args *args =
reinterpret_cast<inflate_chunk_args*
>(arg_list);
448 args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
472 void Chunk::inflate_chunk(
bool deflate,
bool shuffle,
unsigned int chunk_size,
unsigned int elem_width)
488 chunk_size *= elem_width;
491 char *dest =
new char[chunk_size];
493 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
495 set_rbuf(dest, chunk_size);
505 char *dest =
new char[get_rbuf_size()];
507 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
508 set_rbuf(dest, get_rbuf_size());
516 d_is_inflated =
true;
518 #if 0 // This was handy during development for debugging. Keep it for awhile (year or two) before we drop it ndp - 01/18/17
520 unsigned long long chunk_buf_size = get_rbuf_size();
521 dods_float32 *vals = (dods_float32 *) get_rbuf();
523 (*os) << std::fixed << std::setfill(
'_') << std::setw(10) << std::setprecision(0);
524 (*os) <<
"DmrppArray::"<< __func__ <<
"() - Chunk[" << i <<
"]: " << endl;
525 for(
unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
526 (*os) << vals[k] <<
", " << ((k==0)|((k+1)%10)?
"":
"\n");
541 void Chunk::read_chunk()
544 BESDEBUG(
"dmrpp",
"Chunk::"<< __func__ <<
"() - Already been read! Returning." << endl);
550 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
556 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
559 if (get_size() != get_bytes_read()) {
561 oss <<
"Wrong number of bytes read for chunk; read: " << get_bytes_read() <<
", expected: " << get_size();
578 void Chunk::dump(ostream &oss)
const
581 oss <<
"[ptr='" << (
void *)
this <<
"']";
582 oss <<
"[data_url='" << d_data_url <<
"']";
583 oss <<
"[offset=" << d_offset <<
"]";
584 oss <<
"[size=" << d_size <<
"]";
585 oss <<
"[chunk_position_in_array=(";
586 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
588 oss << d_chunk_position_in_array[i];
591 oss <<
"[is_read=" << d_is_read <<
"]";
592 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
595 string Chunk::to_string()
const
597 std::ostringstream oss;