bes  Updated for version 3.20.6
DmrppD4Opaque.cc
1 
2 // -*- mode: c++; c-basic-offset:4 -*-
3 
4 // This file is part of the BES
5 
6 // Copyright (c) 2016 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <cstring>
28 
29 #include <string>
30 #include <queue>
31 
32 #include <BESInternalError.h>
33 #include <BESDebug.h>
34 
35 #include "CurlHandlePool.h"
36 #include "DmrppRequestHandler.h"
37 #include "DmrppD4Opaque.h"
38 
39 using namespace libdap;
40 using namespace std;
41 
42 namespace dmrpp {
43 
44 void
45 DmrppD4Opaque::_duplicate(const DmrppD4Opaque &)
46 {
47 }
48 
49 DmrppD4Opaque::DmrppD4Opaque(const string &n) : D4Opaque(n), DmrppCommon()
50 {
51 }
52 
53 DmrppD4Opaque::DmrppD4Opaque(const string &n, const string &d) : D4Opaque(n, d), DmrppCommon()
54 {
55 }
56 
57 BaseType *
58 DmrppD4Opaque::ptr_duplicate()
59 {
60  return new DmrppD4Opaque(*this);
61 }
62 
63 DmrppD4Opaque::DmrppD4Opaque(const DmrppD4Opaque &rhs) : D4Opaque(rhs), DmrppCommon(rhs)
64 {
65  _duplicate(rhs);
66 }
67 
68 DmrppD4Opaque &
69 DmrppD4Opaque::operator=(const DmrppD4Opaque &rhs)
70 {
71  if (this == &rhs)
72  return *this;
73 
74  dynamic_cast<D4Opaque &>(*this) = rhs; // run Constructor=
75 
76  _duplicate(rhs);
77  DmrppCommon::m_duplicate_common(rhs);
78 
79  return *this;
80 }
81 
82 void DmrppD4Opaque::insert_chunk(Chunk *chunk)
83 {
84  // The size, in elements, of each of the chunk's dimensions.
85  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
86  if (chunk_shape.size() != 1) throw BESInternalError("Opaque variables' chunks can only have one dimension.", __FILE__, __LINE__);
87 
88  // The chunk's origin point a.k.a. its "position in array".
89  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
90 
91  char *source_buffer = chunk->get_rbuf();
92  unsigned char *target_buffer = get_buf();
93 
94  memcpy(target_buffer + chunk_origin[0], source_buffer, chunk_shape[0]);
95 }
96 
97 void DmrppD4Opaque::read_chunks_parallel()
98 {
99  vector<Chunk> &chunk_refs = get_chunk_vec();
100  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
101 
102  // This is not needed here - Opaque is never constrained - but using it
103  // means we can reuse the DmrppArray::read_chunks_parallel() method's logic.
104  // TODO Replace with a more efficient version. jhrg 5/3/18
105  queue<Chunk*> chunks_to_read;
106 
107  // Look at all the chunks
108  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
109  chunks_to_read.push(&*c);
110  }
111 
112  // FIXME Call the new resize() method using num_of_chunks * chunk_size here
113 
114  if (DmrppRequestHandler::d_use_parallel_transfers) {
115  // This is the parallel version of the code. It reads a set of chunks in parallel
116  // using the multi curl API, then inserts them, then reads the next set, ... jhrg 5/1/18
117  unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
118  dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
119 
120  // Look only at the chunks we need, found above. jhrg 4/30/18
121  while (chunks_to_read.size() > 0) {
122  queue<Chunk*> chunks_to_insert;
123  for (unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
124  Chunk *chunk = chunks_to_read.front();
125  chunks_to_read.pop();
126 
127  chunk->set_rbuf_to_size();
128  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
129  if (!handle) throw BESInternalError("No more libcurl handles.", __FILE__, __LINE__);
130 
131  mhandle->add_easy_handle(handle);
132 
133  chunks_to_insert.push(chunk);
134  }
135 
136  mhandle->read_data(); // read and decompress chunks, then remove the easy_handles
137 
138  while (chunks_to_insert.size() > 0) {
139  Chunk *chunk = chunks_to_insert.front();
140  chunks_to_insert.pop();
141 
142  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 /*elem width*/);
143 
144  insert_chunk(chunk);
145  }
146  }
147  }
148  else {
149  // This version is the 'serial' version of the code. It reads a chunk, inserts it,
150  // reads the next one, and so on.
151  while (chunks_to_read.size() > 0) {
152  Chunk *chunk = chunks_to_read.front();
153  chunks_to_read.pop();
154 
155  chunk->read_chunk();
156 
157  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 /*elem width*/);
158 
159  insert_chunk(chunk);
160  }
161  }
162 
163  set_read_p(true);
164 }
165 
166 
176 bool
178 {
179  if (read_p()) return true;
180 
181  // if there are no chunks, use read a single contiguous block of data
182  // and store it in the object. Note that DmrppCommon uses a single Chunk
183  // instance to hold 'contiguous' data.
184  if (get_chunk_dimension_sizes().empty()) {
185  // read_atomic() returns a pointer to the Chunk data. When the Chunk
186  // instance is freed, this memory goes away.
187  char *data = read_atomic(name());
188  val2buf(data); // yes, it's not type-safe
189  }
190  else {
191  // Handle the more complex case where the data is chunked.
192  read_chunks_parallel();
193  }
194 
195  return true;
196 }
197 
198 void DmrppD4Opaque::dump(ostream & strm) const
199 {
200  strm << BESIndent::LMarg << "DmrppD4Opaque::dump - (" << (void *) this << ")" << endl;
201  BESIndent::Indent();
202  DmrppCommon::dump(strm);
203  D4Opaque::dump(strm);
204  strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/ endl;
205  BESIndent::UnIndent();
206 }
207 
208 } // namespace dmrpp
209 
dmrpp::DmrppCommon::get_chunk_vec
virtual std::vector< Chunk > & get_chunk_vec()
Returns a reference to the internal Chunk vector.
Definition: DmrppCommon.h:82
dmrpp::DmrppD4Opaque::read
virtual bool read()
Read opaque data.
Definition: DmrppD4Opaque.cc:177
dmrpp::DmrppCommon::is_shuffle_compression
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:117
dmrpp::DmrppCommon::get_chunk_size_in_elements
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:139
dmrpp::DmrppCommon::is_deflate_compression
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.
Definition: DmrppCommon.h:107
libdap
Definition: BESDapFunctionResponseCache.h:35
BESInternalError
exception thrown if internal error encountered
Definition: BESInternalError.h:43
dmrpp::DmrppCommon::read_atomic
virtual char * read_atomic(const std::string &name)
read method for the atomic types
Definition: DmrppCommon.cc:187
dmrpp::DmrppD4Opaque::get_buf
virtual unsigned char * get_buf()
Get a pointer to start of the Opaque data buffer.
Definition: DmrppD4Opaque.h:63