bes  Updated for version 3.20.6
DmrppArray.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: James Gallagher <jgallagher@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <string>
27 #include <sstream>
28 #include <iomanip>
29 #include <vector>
30 #include <queue>
31 #include <iterator>
32 
33 #include <cstring>
34 #include <cassert>
35 #include <cerrno>
36 
37 #include <pthread.h>
38 #include <cmath>
39 
40 #include <unistd.h>
41 
42 #include <D4Enum.h>
43 #include <D4EnumDefs.h>
44 #include <D4Attributes.h>
45 #include <D4Maps.h>
46 #include <D4Group.h>
47 
48 #include "BESLog.h"
49 #include "BESInternalError.h"
50 #include "BESDebug.h"
51 
52 #include "CurlHandlePool.h"
53 #include "Chunk.h"
54 #include "DmrppArray.h"
55 #include "DmrppRequestHandler.h"
56 
57 // Used with BESDEBUG
58 static const string dmrpp_3 = "dmrpp:3";
59 static const string dmrpp_4 = "dmrpp:4";
60 
61 using namespace libdap;
62 using namespace std;
63 
64 #define MB (1024*1024)
65 
66 namespace dmrpp {
67 
68 void DmrppArray::_duplicate(const DmrppArray &)
69 {
70 }
71 
72 DmrppArray::DmrppArray(const string &n, BaseType *v) :
73  Array(n, v, true /*is dap4*/), DmrppCommon()
74 {
75 }
76 
77 DmrppArray::DmrppArray(const string &n, const string &d, BaseType *v) :
78  Array(n, d, v, true), DmrppCommon()
79 {
80 }
81 
82 BaseType *
83 DmrppArray::ptr_duplicate()
84 {
85  return new DmrppArray(*this);
86 }
87 
88 DmrppArray::DmrppArray(const DmrppArray &rhs) :
89  Array(rhs), DmrppCommon(rhs)
90 {
91  _duplicate(rhs);
92 }
93 
94 DmrppArray &
95 DmrppArray::operator=(const DmrppArray &rhs)
96 {
97  if (this == &rhs) return *this;
98 
99  dynamic_cast<Array &>(*this) = rhs; // run Constructor=
100 
101  _duplicate(rhs);
102  DmrppCommon::m_duplicate_common(rhs);
103 
104  return *this;
105 }
106 
111 bool DmrppArray::is_projected()
112 {
113  for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
114  if (dimension_size(p, true) != dimension_size(p, false)) return true;
115 
116  return false;
117 }
118 
137 static unsigned long long get_index(const vector<unsigned int> &address_in_target, const vector<unsigned int> &target_shape)
138 {
139  assert(address_in_target.size() == target_shape.size()); // ranks must be equal
140 
141  vector<unsigned int>::const_reverse_iterator shape_index = target_shape.rbegin();
142  vector<unsigned int>::const_reverse_iterator index = address_in_target.rbegin(), index_end = address_in_target.rend();
143 
144  unsigned long long multiplier = *shape_index++;
145  unsigned long long offset = *index++;
146 
147  while (index != index_end) {
148  assert(*index < *shape_index); // index < shape for each dim
149 
150  offset += multiplier * *index++;
151  multiplier *= *shape_index++;
152  }
153 
154  return offset;
155 }
156 
163 unsigned long long DmrppArray::get_size(bool constrained)
164 {
165  // number of array elements in the constrained array
166  unsigned long long size = 1;
167  for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
168  size *= dimension_size(dim, constrained);
169  }
170  return size;
171 }
172 
179 vector<unsigned int> DmrppArray::get_shape(bool constrained)
180 {
181  Dim_iter dim = dim_begin(), edim = dim_end();
182  vector<unsigned int> shape;
183 
184  // For a 3d array, this method took 14ms without reserve(), 5ms with
185  // (when called many times).
186  shape.reserve(edim - dim);
187 
188  for (; dim != edim; dim++) {
189  shape.push_back(dimension_size(dim, constrained));
190  }
191 
192  return shape;
193 }
194 
200 DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
201 {
202  assert(i <= (dim_end() - dim_begin()));
203  return *(dim_begin() + i);
204 }
205 
211 void DmrppArray::insert_constrained_contiguous(Dim_iter dimIter, unsigned long *target_index, vector<unsigned int> &subsetAddress,
212  const vector<unsigned int> &array_shape, char /*Chunk*/*src_buf)
213 {
214  BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() - subsetAddress.size(): " << subsetAddress.size() << endl);
215 
216  unsigned int bytesPerElt = prototype()->width();
217 
218  char *dest_buf = get_buf();
219 
220  unsigned int start = this->dimension_start(dimIter, true);
221  unsigned int stop = this->dimension_stop(dimIter, true);
222  unsigned int stride = this->dimension_stride(dimIter, true);
223 
224  dimIter++;
225 
226  // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
227  // See the else clause for the general case.
228  if (dimIter == dim_end() && stride == 1) {
229  // For the start and stop indexes of the subset, get the matching indexes in the whole array.
230  subsetAddress.push_back(start);
231  unsigned long start_index = get_index(subsetAddress, array_shape);
232  subsetAddress.pop_back();
233 
234  subsetAddress.push_back(stop);
235  unsigned long stop_index = get_index(subsetAddress, array_shape);
236  subsetAddress.pop_back();
237 
238  // Copy data block from start_index to stop_index
239  // TODO Replace this loop with a call to std::memcpy()
240  for (unsigned long sourceIndex = start_index; sourceIndex <= stop_index; sourceIndex++) {
241  unsigned long target_byte = *target_index * bytesPerElt;
242  unsigned long source_byte = sourceIndex * bytesPerElt;
243  // Copy a single value.
244  for (unsigned long i = 0; i < bytesPerElt; i++) {
245  dest_buf[target_byte++] = src_buf[source_byte++];
246  }
247  (*target_index)++;
248  }
249  }
250  else {
251  for (unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
252 
253  // Is it the last dimension?
254  if (dimIter != dim_end()) {
255  // Nope!
256  // then we recurse to the last dimension to read stuff
257  subsetAddress.push_back(myDimIndex);
258  insert_constrained_contiguous(dimIter, target_index, subsetAddress, array_shape, src_buf);
259  subsetAddress.pop_back();
260  }
261  else {
262  // We are at the last (inner most) dimension.
263  // So it's time to copy values.
264  subsetAddress.push_back(myDimIndex);
265  unsigned int sourceIndex = get_index(subsetAddress, array_shape);
266  subsetAddress.pop_back();
267 
268  // Copy a single value.
269  unsigned long target_byte = *target_index * bytesPerElt;
270  unsigned long source_byte = sourceIndex * bytesPerElt;
271 
272  for (unsigned int i = 0; i < bytesPerElt; i++) {
273  dest_buf[target_byte++] = src_buf[source_byte++];
274  }
275  (*target_index)++;
276  }
277  }
278  }
279 }
280 
286 void *one_chunk_unconstrained_thread(void *arg_list)
287 {
288  one_chunk_unconstrained_args *args = reinterpret_cast<one_chunk_unconstrained_args*>(arg_list);
289 
290  try {
291  process_one_chunk_unconstrained(args->chunk, args->array, args->array_shape, args->chunk_shape);
292  }
293  catch (BESError &error) {
294  write(args->fds[1], &args->tid, sizeof(args->tid));
295  delete args;
296  pthread_exit(new string(error.get_verbose_message()));
297  }
298 
299  // tid is a char and thus us written atomically. Writing this tells the parent
300  // thread the child is complete and it should call pthread_join(tid, ...)
301  write(args->fds[1], &args->tid, sizeof(args->tid));
302 
303  delete args;
304  pthread_exit(NULL);
305 }
306 
315 void *one_child_chunk_thread(void *arg_list)
316 {
317  one_child_chunk_args *args = reinterpret_cast<one_child_chunk_args*>(arg_list);
318 
319  try {
320  args->child_chunk->read_chunk();
321 
322  assert(args->master_chunk->get_rbuf());
323  assert(args->child_chunk->get_rbuf());
324  assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
325 
326  // master offset \/
327  // master chunk: mmmmmmmmmmmmmmmm
328  // child chunks: 1111222233334444 (there are four child chunks)
329  // child offsets: ^ ^ ^ ^
330  // For this example, child_1_offset - master_offset == 0 (that's always true)
331  // child_2_offset - master_offset == 4; child_2_offset - master_offset == 8
332  // and child_3_offset - master_offset == 12.
333  // Those are the starting locations with in the data buffer of the master chunk
334  // where that child chunk should be written.
335  // Note: all of the offset values start at the begining of the file.
336 
337  unsigned int offset_within_master_chunk = args->child_chunk->get_offset() - args->master_chunk->get_offset();
338 
339  memcpy(args->master_chunk->get_rbuf() + offset_within_master_chunk, args->child_chunk->get_rbuf(), args->child_chunk->get_bytes_read());
340  }
341  catch (BESError &error) {
342  write(args->fds[1], &args->tid, sizeof(args->tid));
343 
344  delete args;
345  pthread_exit(new string(error.get_verbose_message()));
346  }
347 
348  // tid is a char and thus us written atomically. Writing this tells the parent
349  // thread the child is complete and it should call pthread_join(tid, ...)
350  write(args->fds[1], &args->tid, sizeof(args->tid));
351 
352  delete args;
353  pthread_exit(NULL);
354 }
355 
361 void DmrppArray::read_contiguous()
362 {
363  // These first four lines reproduce DmrppCommon::read_atomic(). The call
364  // to Chunk::inflate_chunk() handles 'contiguous' data that are compressed.
365  // And since we need the chunk, I copied the read_atomix code here.
366 
367  vector<Chunk> &chunk_refs = get_chunk_vec();
368 
369  if (chunk_refs.size() != 1) throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
370 
371  // This is the original chunk for this 'contiguous' variable.
372  Chunk &master_chunk = chunk_refs[0];
373 
374  unsigned long long master_chunk_size = master_chunk.get_size();
375 
376  // If we want to read the chunk in parallel. Only read in parallel above some
377  // threshold. jhrg 9/21/19
378  // Only use parallel read if the chunk is over 2MB, otherwise it is easier to just read it as is kln 9/23/19
379  if (DmrppRequestHandler::d_use_parallel_transfers && master_chunk_size > DmrppRequestHandler::d_min_size) {
380 
381  // Allocated memory for the 'master chunk' so the threads can transfer data
382  // from the child chunks to it.
383  master_chunk.set_rbuf_to_size();
384 
385  // The number of child chunks are determined based on the size of the data.
386  // If the size of the master chunk is 3MB then 3 chunks will be made. We will round down
387  // when necessary and handle the remainder later on (3.3MB = 3 chunks, 4.2MB = 4 chunks, etc.) kln 9/23/19
388  unsigned int num_chunks = floor(master_chunk_size/MB);
389  if ( num_chunks >= DmrppRequestHandler::d_max_parallel_transfers)
390  num_chunks = DmrppRequestHandler::d_max_parallel_transfers;
391 
392  // This pipe is used by the child threads to indicate completion
393  int fds[2];
394  int status = pipe(fds);
395  if (status < 0)
396  throw BESInternalError(string("Could not open a pipe for thread communication: ").append(strerror(errno)), __FILE__, __LINE__);
397 
398  // Use the original chunk's size and offset to evenly split it into smaller chunks
399  unsigned long long chunk_size = master_chunk_size / num_chunks;
400  unsigned long long chunk_offset = master_chunk.get_offset();
401 
402  // If the size of the master chunk is not evenly divisible by num_chunks, capture
403  // the remainder here and increase the size of the last chunk by this number of bytes.
404  unsigned int chunk_remainder = master_chunk.get_size() % num_chunks;
405 
406  string chunk_url = master_chunk.get_data_url();
407 
408  // Setup a queue to break up the original master_chunk and keep track of the pieces
409  queue<Chunk *> chunks_to_read;
410 
411  for (unsigned int i = 0; i < num_chunks-1; i++) {
412  chunks_to_read.push(new Chunk(chunk_url, chunk_size, (chunk_size * i) + chunk_offset));
413  }
414  // See above for details about chunk_remainder. jhrg 9/21/19
415  chunks_to_read.push(new Chunk(chunk_url, chunk_size + chunk_remainder, (chunk_size * (num_chunks-1)) + chunk_offset));
416 
417  // Start the max number of processing pipelines
418  pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
419  memset(&threads[0], 0, sizeof(pthread_t) * DmrppRequestHandler::d_max_parallel_transfers);
420 
421  try {
422  unsigned int num_threads = 0;
423 
424  // start initial set of threads
425  for (unsigned int i = 0; i < (unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
426  Chunk *current_chunk = chunks_to_read.front();
427  chunks_to_read.pop();
428 
429  // thread number is 'i'
430  one_child_chunk_args *args = new one_child_chunk_args(fds, i, current_chunk, &master_chunk);
431  int status = pthread_create(&threads[i], NULL, dmrpp::one_child_chunk_thread, (void*) args);
432 
433  if (status == 0) {
434  ++num_threads;
435  BESDEBUG(dmrpp_3, "started thread: " << i << endl);
436  }
437  else {
438  ostringstream oss("Could not start process_one_chunk_unconstrained thread for master_chunk ", std::ios::ate);
439  oss << i << ": " << strerror(status);
440  BESDEBUG(dmrpp_3, oss.str());
441  throw BESInternalError(oss.str(), __FILE__, __LINE__);
442  }
443  }
444 
445  // Now join the child threads, creating replacement threads if needed
446  while (num_threads > 0) {
447  unsigned char tid; // bytes can be written atomically
448  // Block here until a child thread writes to the pipe, then read the byte
449  int bytes = ::read(fds[0], &tid, sizeof(tid));
450  if (bytes != sizeof(tid))
451  throw BESInternalError(string("Could not read the thread id: ").append(strerror(errno)), __FILE__, __LINE__);
452 
453  if (!(tid < DmrppRequestHandler::d_max_parallel_transfers)) {
454  ostringstream oss("Invalid thread id read after thread exit: ", std::ios::ate);
455  oss << tid;
456  throw BESInternalError(oss.str(), __FILE__, __LINE__);
457  }
458 
459  string *error;
460  int status = pthread_join(threads[tid], (void**)&error);
461  --num_threads;
462  BESDEBUG(dmrpp_3, "joined thread: " << (unsigned int)tid << ", there are: " << num_threads << endl);
463 
464  if (status != 0) {
465  ostringstream oss("Could not join process_one_chunk_unconstrained thread for master_chunk ", std::ios::ate);
466  oss << tid << ": " << strerror(status);
467  throw BESInternalError(oss.str(), __FILE__, __LINE__);
468  }
469  else if (error != 0) {
470  BESInternalError e(*error, __FILE__, __LINE__);
471  delete error;
472  throw e;
473  }
474  else if (chunks_to_read.size() > 0) {
475  Chunk *current_chunk = chunks_to_read.front();
476  chunks_to_read.pop();
477 
478  // thread number is 'tid,' the number of the thread that just completed
479  one_child_chunk_args *args = new one_child_chunk_args(fds, tid, current_chunk, &master_chunk);
480  int status = pthread_create(&threads[tid], NULL, dmrpp::one_child_chunk_thread, (void*) args);
481 
482  if (status != 0) {
483  ostringstream oss;
484  oss << "Could not start process_one_chunk_unconstrained thread for master_chunk " << tid << ": " << strerror(status);
485  throw BESInternalError(oss.str(), __FILE__, __LINE__);
486  }
487  ++num_threads;
488  BESDEBUG(dmrpp_3, "started thread: " << (unsigned int)tid << ", there are: " << threads << endl);
489  }
490  }
491 
492  // Once done with the threads, close the communication pipe.
493  close(fds[0]);
494  close(fds[1]);
495  }
496  catch (...) {
497  // cancel all the threads, otherwise we'll have threads out there using up resources
498  // defined in DmrppCommon.cc
499  join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
500  // close the pipe used to communicate with the child threads
501  close(fds[0]);
502  close(fds[1]);
503  // re-throw the exception
504  throw;
505  }
506  }
507  else {
508  // Else read the master_chunk as is
509  master_chunk.read_chunk();
510  }
511 
512  master_chunk.inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
513 
514  // 'master_chunk' now holds the data. Transfer it to the Array.
515 
516  if (!is_projected()) { // if there is no projection constraint
517  val2buf(master_chunk.get_rbuf()); // yes, it's not type-safe
518  }
519  else { // apply the constraint
520  vector<unsigned int> array_shape = get_shape(false);
521 
522  // Reserve space in this array for the constrained size of the data request
523  reserve_value_capacity(get_size(true));
524  unsigned long target_index = 0;
525  vector<unsigned int> subset;
526 
527  insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, master_chunk.get_rbuf());
528  }
529 
530  set_read_p(true);
531 }
532 
545 unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned int chunk_origin)
546 {
547  // What's the first element that we are going to access for this dimension of the chunk?
548  unsigned long long first_element_offset = 0; // start with 0
549  if ((unsigned) (thisDim.start) < chunk_origin) {
550  // If the start is behind this chunk, then it's special.
551  if (thisDim.stride != 1) {
552  // And if the stride isn't 1, we have to figure our where to begin in this chunk.
553  first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
554  // If it's zero great!
555  if (first_element_offset != 0) {
556  // otherwise we adjustment to get correct first element.
557  first_element_offset = thisDim.stride - first_element_offset;
558  }
559  }
560  }
561  else {
562  first_element_offset = thisDim.start - chunk_origin;
563  }
564 
565  return first_element_offset;
566 }
567 
568 #ifdef USE_READ_SERIAL
569 
590 void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
591  Chunk *chunk)
592 {
593  BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
594 
595  // The size, in elements, of each of the chunk's dimensions.
596  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
597 
598  // The chunk's origin point a.k.a. its "position in array".
599  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
600 
601  dimension thisDim = this->get_dimension(dim);
602 
603  // Do we even want this chunk?
604  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
605  return; // No. No, we do not. Skip this.
606  }
607 
608  // What's the first element that we are going to access for this dimension of the chunk?
609  unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
610 
611  // Is the next point to be sent in this chunk at all? If no, return.
612  if (first_element_offset > chunk_shape[dim]) {
613  return;
614  }
615 
616  // Now we figure out the correct last element, based on the subset expression
617  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
618  if ((unsigned) thisDim.stop < end_element) {
619  end_element = thisDim.stop;
620  }
621 
622  unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
623  unsigned long long chunk_end = end_element - chunk_origin[dim];
624  vector<unsigned int> constrained_array_shape = get_shape(true);
625 
626  unsigned int last_dim = chunk_shape.size() - 1;
627  if (dim == last_dim) {
628  // Read and Process chunk
629  chunk->read_chunk();
630 
631  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
632 
633  char *source_buffer = chunk->get_rbuf();
634  char *target_buffer = get_buf();
635  unsigned int elem_width = prototype()->width();
636 
637  if (thisDim.stride == 1) {
638  // The start element in this array
639  unsigned long long start_element = chunk_origin[dim] + first_element_offset;
640  // Compute how much we are going to copy
641  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
642 
643  // Compute where we need to put it.
644  (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
645  // Compute where we are going to read it from
646  (*chunk_element_address)[dim] = first_element_offset;
647 
648  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
649  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
650 
651  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
652  }
653  else {
654  // Stride != 1
655  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
656  // Compute where we need to put it.
657  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
658 
659  // Compute where we are going to read it from
660  (*chunk_element_address)[dim] = chunk_index;
661 
662  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
663  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
664 
665  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
666  }
667  }
668  }
669  else {
670  // Not the last dimension, so we continue to proceed down the Recursion Branch.
671  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
672  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
673  (*chunk_element_address)[dim] = chunk_index;
674 
675  // Re-entry here:
676  insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
677  }
678  }
679 }
680 
681 void DmrppArray::read_chunks_serial()
682 {
683  BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
684 
685  vector<Chunk> &chunk_refs = get_chunk_vec();
686  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
687 
688  // Allocate target memory.
689  reserve_value_capacity(get_size(true));
690 
691  /*
692  * Find the chunks to be read, make curl_easy handles for them, and
693  * stuff them into our curl_multi handle. This is a recursive activity
694  * which utilizes the same code that copies the data from the chunk to
695  * the variables.
696  */
697  for (unsigned long i = 0; i < chunk_refs.size(); i++) {
698  Chunk &chunk = chunk_refs[i];
699 
700  vector<unsigned int> chunk_source_address(dimensions(), 0);
701  vector<unsigned int> target_element_address = chunk.get_position_in_array();
702 
703  // Recursive insertion operation.
704  insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
705  }
706 
707  set_read_p(true);
708 
709  BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
710 }
711 #endif
712 
734 Chunk *
735 DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned int> *target_element_address, Chunk *chunk)
736 {
737  BESDEBUG(dmrpp_3, __func__ << " BEGIN, dim: " << dim << endl);
738 
739  // The size, in elements, of each of the chunk's dimensions.
740  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
741 
742  // The chunk's origin point a.k.a. its "position in array".
743  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
744 
745  dimension thisDim = this->get_dimension(dim);
746 
747  // Do we even want this chunk?
748  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
749  return 0; // No. No, we do not. Skip this chunk.
750  }
751 
752  // What's the first element that we are going to access for this dimension of the chunk?
753  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
754 
755  // Is the next point to be sent in this chunk at all? If no, return.
756  if (chunk_start > chunk_shape[dim]) {
757  return 0;
758  }
759 
760  // Now we figure out the correct last element, based on the subset expression
761  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
762  if ((unsigned) thisDim.stop < end_element) {
763  end_element = thisDim.stop;
764  }
765 
766  unsigned long long chunk_end = end_element - chunk_origin[dim];
767 
768  unsigned int last_dim = chunk_shape.size() - 1;
769  if (dim == last_dim) {
770  return chunk;
771  }
772  else {
773  // Not the last dimension, so we continue to proceed down the Recursion Branch.
774  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
775  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
776 
777  // Re-entry here:
778  Chunk *needed = find_needed_chunks(dim + 1, target_element_address, chunk);
779  if (needed) return needed;
780  }
781  }
782 
783  return 0;
784 }
785 
805 void DmrppArray::insert_chunk(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
806  Chunk *chunk, const vector<unsigned int> &constrained_array_shape)
807 {
808  // The size, in elements, of each of the chunk's dimensions.
809  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
810 
811  // The chunk's origin point a.k.a. its "position in array".
812  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
813 
814  dimension thisDim = this->get_dimension(dim);
815 
816  // What's the first element that we are going to access for this dimension of the chunk?
817  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
818 
819  // Now we figure out the correct last element, based on the subset expression
820  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
821  if ((unsigned) thisDim.stop < end_element) {
822  end_element = thisDim.stop;
823  }
824 
825  unsigned long long chunk_end = end_element - chunk_origin[dim];
826 
827  unsigned int last_dim = chunk_shape.size() - 1;
828  if (dim == last_dim) {
829  char *source_buffer = chunk->get_rbuf();
830  char *target_buffer = get_buf();
831  unsigned int elem_width = prototype()->width();
832 
833  if (thisDim.stride == 1) {
834  // The start element in this array
835  unsigned long long start_element = chunk_origin[dim] + chunk_start;
836  // Compute how much we are going to copy
837  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
838 
839  // Compute where we need to put it.
840  (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
841  // Compute where we are going to read it from
842  (*chunk_element_address)[dim] = chunk_start;
843 
844  // See below re get_index()
845  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
846  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
847 
848  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
849  }
850  else {
851  // Stride != 1
852  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
853  // Compute where we need to put it.
854  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
855 
856  // Compute where we are going to read it from
857  (*chunk_element_address)[dim] = chunk_index;
858 
859  // These calls to get_index() can be removed as with the insert...unconstrained() code.
860  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
861  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
862 
863  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
864  }
865  }
866  }
867  else {
868  // Not the last dimension, so we continue to proceed down the Recursion Branch.
869  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
870  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
871  (*chunk_element_address)[dim] = chunk_index;
872 
873  // Re-entry here:
874  insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
875  }
876  }
877 }
878 
885 void DmrppArray::read_chunks()
886 {
887  vector<Chunk> &chunk_refs = get_chunk_vec();
888  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
889 
890  // Find all the chunks to read. I used a queue to preserve the chunk order, which
891  // made using a debugger easier. However, order does not matter, AFAIK.
892  queue<Chunk *> chunks_to_read;
893 
894  // Look at all the chunks
895  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
896  Chunk &chunk = *c;
897 
898  vector<unsigned int> target_element_address = chunk.get_position_in_array();
899  Chunk *needed = find_needed_chunks(0 /* dimension */, &target_element_address, &chunk);
900  if (needed) chunks_to_read.push(needed);
901  }
902 
903  reserve_value_capacity(get_size(true));
904  vector<unsigned int> constrained_array_shape = get_shape(true);
905 
906  BESDEBUG(dmrpp_3, "d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
907  BESDEBUG(dmrpp_3, "d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
908 
909  if (DmrppRequestHandler::d_use_parallel_transfers) {
910  // This is the parallel version of the code. It reads a set of chunks in parallel
911  // using the multi curl API, then inserts them, then reads the next set, ... jhrg 5/1/18
912  unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
913  dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
914 
915  // Look only at the chunks we need, found above. jhrg 4/30/18
916  while (chunks_to_read.size() > 0) {
917  queue<Chunk*> chunks_to_insert;
918  for (unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
919  Chunk *chunk = chunks_to_read.front();
920  chunks_to_read.pop();
921 
922  chunk->set_rbuf_to_size();
923  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
924  if (!handle) throw BESInternalError("No more libcurl handles.", __FILE__, __LINE__);
925 
926  BESDEBUG(dmrpp_3, "Queuing: " << chunk->to_string() << endl);
927  mhandle->add_easy_handle(handle);
928 
929  chunks_to_insert.push(chunk);
930  }
931 
932  mhandle->read_data(); // read, then remove the easy_handles
933 
934  while (chunks_to_insert.size() > 0) {
935  Chunk *chunk = chunks_to_insert.front();
936  chunks_to_insert.pop();
937 
938  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
939 
940  vector<unsigned int> target_element_address = chunk->get_position_in_array();
941  vector<unsigned int> chunk_source_address(dimensions(), 0);
942 
943  BESDEBUG(dmrpp_3, "Inserting: " << chunk->to_string() << endl);
944  insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
945  }
946  }
947  }
948  else {
949  // This version is the 'serial' version of the code. It reads a chunk, inserts it,
950  // reads the next one, and so on.
951  while (chunks_to_read.size() > 0) {
952  Chunk *chunk = chunks_to_read.front();
953  chunks_to_read.pop();
954 
955  BESDEBUG(dmrpp_3, "Reading: " << chunk->to_string() << endl);
956  chunk->read_chunk();
957 
958  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
959 
960  vector<unsigned int> target_element_address = chunk->get_position_in_array();
961  vector<unsigned int> chunk_source_address(dimensions(), 0);
962 
963  BESDEBUG(dmrpp_3, "Inserting: " << chunk->to_string() << endl);
964  insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
965  }
966  }
967 
968  set_read_p(true);
969 }
970 
984 static unsigned long multiplier(const vector<unsigned int> &shape, unsigned int k)
985 {
986  assert(shape.size() > 1);
987  assert(shape.size() > k + 1);
988 
989  vector<unsigned int>::const_iterator i = shape.begin(), e = shape.end();
990  advance(i, k + 1);
991  unsigned long multiplier = *i++;
992  while (i != e) {
993  multiplier *= *i++;
994  }
995 
996  return multiplier;
997 }
998 
1018 void DmrppArray::insert_chunk_unconstrained(Chunk *chunk, unsigned int dim, unsigned long long array_offset, const vector<unsigned int> &array_shape,
1019  unsigned long long chunk_offset, const vector<unsigned int> &chunk_shape, const vector<unsigned int> &chunk_origin)
1020 {
1021  // Now we figure out the correct last element. It's possible that a
1022  // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
1023  // last element of the destination array
1024  dimension thisDim = this->get_dimension(dim);
1025  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1026  if ((unsigned) thisDim.stop < end_element) {
1027  end_element = thisDim.stop;
1028  }
1029 
1030  unsigned long long chunk_end = end_element - chunk_origin[dim];
1031 
1032  unsigned int last_dim = chunk_shape.size() - 1;
1033  if (dim == last_dim) {
1034  unsigned int elem_width = prototype()->width();
1035 
1036  array_offset += chunk_origin[dim];
1037 
1038  // Compute how much we are going to copy
1039  unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
1040  char *source_buffer = chunk->get_rbuf();
1041  char *target_buffer = get_buf();
1042  memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
1043  }
1044  else {
1045  unsigned long mc = multiplier(chunk_shape, dim);
1046  unsigned long ma = multiplier(array_shape, dim);
1047 
1048  // Not the last dimension, so we continue to proceed down the Recursion Branch.
1049  for (unsigned int chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
1050  unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
1051  unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
1052 
1053  // Re-entry here:
1054  insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape, chunk_origin);
1055  }
1056  }
1057 }
1058 
1067 void process_one_chunk_unconstrained(Chunk *chunk, DmrppArray *array, const vector<unsigned int> &array_shape,
1068  const vector<unsigned int> &chunk_shape)
1069 {
1070  chunk->read_chunk();
1071 
1072  if (array->is_deflate_compression() || array->is_shuffle_compression())
1073  chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(), array->get_chunk_size_in_elements(),
1074  array->var()->width());
1075 
1076  array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
1077 }
1078 
1079 void DmrppArray::read_chunks_unconstrained()
1080 {
1081  vector<Chunk> &chunk_refs = get_chunk_vec();
1082  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1083 
1084  reserve_value_capacity(get_size());
1085 
1086  // The size in element of each of the array's dimensions
1087  const vector<unsigned int> array_shape = get_shape(true);
1088  // The size, in elements, of each of the chunk's dimensions
1089  const vector<unsigned int> chunk_shape = get_chunk_dimension_sizes();
1090 
1091  BESDEBUG(dmrpp_3, __func__ << endl);
1092 
1093  BESDEBUG(dmrpp_3, "d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
1094  BESDEBUG(dmrpp_3, "d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
1095 
1096  if (DmrppRequestHandler::d_use_parallel_transfers) {
1097  queue<Chunk *> chunks_to_read;
1098 
1099  // Queue all of the chunks
1100  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c)
1101  chunks_to_read.push(&(*c));
1102 
1103  // This pipe is used by the child threads to indicate completion
1104  int fds[2];
1105  int status = pipe(fds);
1106  if (status < 0)
1107  throw BESInternalError(string("Could not open a pipe for thread communication: ").append(strerror(errno)), __FILE__, __LINE__);
1108 
1109  // Start the max number of processing pipelines
1110  pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
1111  memset(&threads[0], 0, sizeof(pthread_t) * DmrppRequestHandler::d_max_parallel_transfers);
1112 
1113 #if 0
1114  // set the thread[] elements to null - this serves as a sentinel value
1115  for (unsigned int i = 0; i < (unsigned int)DmrppRequestHandler::d_max_parallel_transfers; ++i) {
1116  memset(&threads[i], 0, sizeof(pthread_t));
1117  }
1118 #endif
1119 
1120 
1121  try {
1122  unsigned int num_threads = 0;
1123  for (unsigned int i = 0; i < (unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
1124  Chunk *chunk = chunks_to_read.front();
1125  chunks_to_read.pop();
1126 
1127  // thread number is 'i'
1128  one_chunk_unconstrained_args *args = new one_chunk_unconstrained_args(fds, i, chunk, this, array_shape, chunk_shape);
1129  int status = pthread_create(&threads[i], NULL, dmrpp::one_chunk_unconstrained_thread, (void*) args);
1130  if (status == 0) {
1131  ++num_threads;
1132  BESDEBUG(dmrpp_3, "started thread: " << i << endl);
1133  }
1134  else {
1135  ostringstream oss("Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1136  oss << i << ": " << strerror(status);
1137  BESDEBUG(dmrpp_3, oss.str());
1138  throw BESInternalError(oss.str(), __FILE__, __LINE__);
1139  }
1140  }
1141 
1142  // Now join the child threads, creating replacement threads if needed
1143  while (num_threads > 0) {
1144  unsigned char tid; // bytes can be written atomically
1145  // Block here until a child thread writes to the pipe, then read the byte
1146  int bytes = ::read(fds[0], &tid, sizeof(tid));
1147  if (bytes != sizeof(tid))
1148  throw BESInternalError(string("Could not read the thread id: ").append(strerror(errno)), __FILE__, __LINE__);
1149 
1150  if (!(tid < DmrppRequestHandler::d_max_parallel_transfers)) {
1151  ostringstream oss("Invalid thread id read after thread exit: ", std::ios::ate);
1152  oss << tid;
1153  throw BESInternalError(oss.str(), __FILE__, __LINE__);
1154  }
1155 
1156  string *error;
1157  int status = pthread_join(threads[tid], (void**)&error);
1158  --num_threads;
1159  BESDEBUG(dmrpp_3, "joined thread: " << (unsigned int)tid << ", there are: " << num_threads << endl);
1160 
1161  if (status != 0) {
1162  ostringstream oss("Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1163  oss << tid << ": " << strerror(status);
1164  throw BESInternalError(oss.str(), __FILE__, __LINE__);
1165  }
1166  else if (error != 0) {
1167  BESInternalError e(*error, __FILE__, __LINE__);
1168  delete error;
1169  throw e;
1170  }
1171  else if (chunks_to_read.size() > 0) {
1172  Chunk *chunk = chunks_to_read.front();
1173  chunks_to_read.pop();
1174 
1175  // thread number is 'tid,' the number of the thread that just completed
1176  one_chunk_unconstrained_args *args = new one_chunk_unconstrained_args(fds, tid, chunk, this, array_shape, chunk_shape);
1177  int status = pthread_create(&threads[tid], NULL, dmrpp::one_chunk_unconstrained_thread, (void*) args);
1178  if (status != 0) {
1179  ostringstream oss;
1180  oss << "Could not start process_one_chunk_unconstrained thread for chunk " << tid << ": " << strerror(status);
1181  throw BESInternalError(oss.str(), __FILE__, __LINE__);
1182  }
1183  ++num_threads;
1184  BESDEBUG(dmrpp_3, "started thread: " << (unsigned int)tid << ", there are: " << threads << endl);
1185  }
1186  }
1187 
1188  // Once done with the threads, close the communication pipe.
1189  close(fds[0]);
1190  close(fds[1]);
1191  }
1192  catch (...) {
1193  // cancel all the threads, otherwise we'll have threads out there using up resources
1194  // defined in DmrppCommon.cc
1195  join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
1196  // close the pipe used to communicate with the child threads
1197  close(fds[0]);
1198  close(fds[1]);
1199  // re-throw the exception
1200  throw;
1201  }
1202  }
1203  else { // Serial transfers
1204  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
1205  Chunk &chunk = *c;
1206  process_one_chunk_unconstrained(&chunk, this, array_shape, chunk_shape);
1207  }
1208  }
1209 
1210  set_read_p(true);
1211 }
1212 
1225 {
1226  if (read_p()) return true;
1227 
1228  // Single chunk and 'contiguous' are the same for this code.
1229 
1230  if (get_immutable_chunks().size() == 1 || get_chunk_dimension_sizes().empty()) {
1231  BESDEBUG(dmrpp_4, "Calling read_contiguous() for " << name() << endl);
1232  read_contiguous(); // Throws on various errors
1233  }
1234  else { // Handle the more complex case where the data is chunked.
1235  if (!is_projected()) {
1236  BESDEBUG(dmrpp_4, "Calling read_chunks_unconstrained() for " << name() << endl);
1237  read_chunks_unconstrained();
1238  }
1239  else {
1240  BESDEBUG(dmrpp_4, "Calling read_chunks() for " << name() << endl);
1241  read_chunks();
1242  }
1243  }
1244 
1245  return true;
1246 }
1247 
1251 class PrintD4ArrayDimXMLWriter: public unary_function<Array::dimension&, void> {
1253  XMLWriter &xml;
1254  // Was this variable constrained using local/direct slicing? i.e., is d_local_constraint set?
1255  // If so, don't use shared dimensions; instead emit Dim elements that are anonymous.
1256  bool d_constrained;
1257 public:
1258 
1259  PrintD4ArrayDimXMLWriter(XMLWriter &xml, bool c) :
1260  xml(xml), d_constrained(c)
1261  {
1262  }
1263 
1264  void operator()(Array::dimension &d)
1265  {
1266  // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
1267  // because of the need to print the constrained size of a dimension. I think that
1268  // the constraint information has to be kept here and not in the dimension (since they
1269  // are shared dims). Could hack print_dap4() to take the constrained size, however.
1270  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar*) "Dim") < 0)
1271  throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
1272 
1273  string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1274  // If there is a name, there must be a Dimension (named dimension) in scope
1275  // so write its name but not its size.
1276  if (!d_constrained && !name.empty()) {
1277  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "name", (const xmlChar*) name.c_str()) < 0)
1278  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1279  }
1280  else if (d.use_sdim_for_slice) {
1281  assert(!name.empty());
1282  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "name", (const xmlChar*) name.c_str()) < 0)
1283  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1284  }
1285  else {
1286  ostringstream size;
1287  size << (d_constrained ? d.c_size : d.size);
1288  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "size", (const xmlChar*) size.str().c_str()) < 0)
1289  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1290  }
1291 
1292  if (xmlTextWriterEndElement(xml.get_writer()) < 0) throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
1293  }
1294 };
1295 
1296 class PrintD4ConstructorVarXMLWriter: public unary_function<BaseType*, void> {
1297  XMLWriter &xml;
1298  bool d_constrained;
1299 public:
1300  PrintD4ConstructorVarXMLWriter(XMLWriter &xml, bool c) :
1301  xml(xml), d_constrained(c)
1302  {
1303  }
1304 
1305  void operator()(BaseType *btp)
1306  {
1307  btp->print_dap4(xml, d_constrained);
1308  }
1309 };
1310 
1311 class PrintD4MapXMLWriter: public unary_function<D4Map*, void> {
1312  XMLWriter &xml;
1313 
1314 public:
1315  PrintD4MapXMLWriter(XMLWriter &xml) :
1316  xml(xml)
1317  {
1318  }
1319 
1320  void operator()(D4Map *m)
1321  {
1322  m->print_dap4(xml);
1323  }
1324 };
1326 
1350 void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/)
1351 {
1352  if (constrained && !send_p()) return;
1353 
1354  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar*) var()->type_name().c_str()) < 0)
1355  throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
1356 
1357  if (!name().empty())
1358  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "name", (const xmlChar*) name().c_str()) < 0)
1359  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1360 
1361  // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
1362  if (var()->type() == dods_enum_c) {
1363  D4Enum *e = static_cast<D4Enum*>(var());
1364  string path = e->enumeration()->name();
1365  if (e->enumeration()->parent()) {
1366  // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
1367  path = static_cast<D4Group*>(e->enumeration()->parent()->parent())->FQN() + path;
1368  }
1369  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "enum", (const xmlChar*) path.c_str()) < 0)
1370  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
1371  }
1372 
1373  if (prototype()->is_constructor_type()) {
1374  Constructor &c = static_cast<Constructor&>(*prototype());
1375  for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1376  // bind2nd(mem_fun_ref(&BaseType::print_dap4), xml));
1377  }
1378 
1379  // Drop the local_constraint which is per-array and use a per-dimension on instead
1380  for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1381 
1382  attributes()->print_dap4(xml);
1383 
1384  for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1385 
1386  // Only print the chunks info if there. This is the code added to libdap::Array::print_dap4().
1387  // jhrg 5/10/18
1388  if (DmrppCommon::d_print_chunks && get_immutable_chunks().size() > 0) print_chunks_element(xml, DmrppCommon::d_ns_prefix);
1389 
1390  if (xmlTextWriterEndElement(xml.get_writer()) < 0) throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
1391 }
1392 
1393 void DmrppArray::dump(ostream & strm) const
1394 {
1395  strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
1396  BESIndent::Indent();
1397  DmrppCommon::dump(strm);
1398  Array::dump(strm);
1399  strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
1400  BESIndent::UnIndent();
1401 }
1402 
1403 } // namespace dmrpp
dmrpp::DmrppCommon::get_chunk_vec
virtual std::vector< Chunk > & get_chunk_vec()
Returns a reference to the internal Chunk vector.
Definition: DmrppCommon.h:82
dmrpp::DmrppCommon::d_ns_prefix
static std::string d_ns_prefix
The XML namespace prefix to use.
Definition: DmrppCommon.h:91
dmrpp::DmrppArray::read
virtual bool read()
Read data for the array.
Definition: DmrppArray.cc:1224
dmrpp::DmrppCommon::is_shuffle_compression
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:117
dmrpp::DmrppCommon::get_chunk_size_in_elements
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:139
dmrpp::DmrppCommon::is_deflate_compression
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.
Definition: DmrppCommon.h:107
libdap
Definition: BESDapFunctionResponseCache.h:35
BESInternalError
exception thrown if internal error encountered
Definition: BESInternalError.h:43
dmrpp::DmrppArray::get_size
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
Definition: DmrppArray.cc:163
dmrpp::DmrppArray::print_dap4
virtual void print_dap4(libdap::XMLWriter &writer, bool constrained=false)
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
Definition: DmrppArray.cc:1350
BESError
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
dmrpp::DmrppCommon::print_chunks_element
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
Definition: DmrppCommon.cc:205
dmrpp::DmrppArray::get_shape
virtual std::vector< unsigned int > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:179
dmrpp::DmrppCommon::d_print_chunks
static bool d_print_chunks
if true, print_dap4() prints chunk elements
Definition: DmrppCommon.h:89