bes  Updated for version 3.20.6
ArrayJoinExistingAggregation.cc
1 // This file is part of the "NcML Module" project, a BES module designed
3 // to allow NcML files to be used to be used as a wrapper to add
4 // AIS to existing datasets of any format.
5 //
6 // Copyright (c) 2010 OPeNDAP, Inc.
7 // Author: Michael Johnson <m.johnson@opendap.org>
8 //
9 // For more information, please also see the main website: http://opendap.org/
10 //
11 // This library is free software; you can redistribute it and/or
12 // modify it under the terms of the GNU Lesser General Public
13 // License as published by the Free Software Foundation; either
14 // version 2.1 of the License, or (at your option) any later version.
15 //
16 // This library is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 // Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public
22 // License along with this library; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 //
25 // Please see the files COPYING and COPYRIGHT for more information on the GLPL.
26 //
27 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
29 
30 #include <sstream>
31 
32 #include <Marshaller.h>
33 
34 #include "BESDebug.h"
35 #include "BESStopWatch.h"
36 
37 #include "ArrayJoinExistingAggregation.h"
38 
39 #include "AggregationException.h" // agg_util
40 #include "AggregationUtil.h" // agg_util
41 #include "NCMLDebug.h"
42 
43 static const string DEBUG_CHANNEL(NCML_MODULE_DBG_CHANNEL_2);
44 static const bool PRINT_CONSTRAINTS = false;
45 
46 // Timeouts are now handled in/by the BES framework in BESInterface.
47 // jhrg 12/29/15
48 #undef USE_LOCAL_TIMEOUT_SCHEME
49 
50 namespace agg_util {
51 
53  const AMDList& memberDatasets, std::auto_ptr<ArrayGetterInterface>& arrayGetter, const Dimension& joinDim) :
54  ArrayAggregationBase(granuleTemplate, memberDatasets, arrayGetter), _joinDim(joinDim)
55 {
56  BESDEBUG_FUNC(DEBUG_CHANNEL, "Making the aggregated outer dimension be: " + joinDim.toString() + "\n");
57 
58  // We created the array with the given granule prototype, but the resulting
59  // outer dimension size must be calculated according to the
60  // value in the passed in dimension object.
61  libdap::Array::dimension& rOuterDim = *(dim_begin());
62  NCML_ASSERT_MSG(rOuterDim.name == joinDim.name, "The outer dimension name of this is not the expected "
63  "outer dimension name! Broken precondition: This ctor cannot be called "
64  "without this being true!");
65  rOuterDim.size = joinDim.size;
66  // Force it to recompute constraints since we changed size.
67  reset_constraint();
68 
69  ostringstream oss;
71  if (PRINT_CONSTRAINTS) {
72  // constraints as well to ensure reset worked.
74  }
75  BESDEBUG_FUNC(DEBUG_CHANNEL, "Constrained Dims after set are: " + oss.str());
76 }
77 
79  ArrayAggregationBase(rhs), _joinDim(rhs._joinDim)
80 {
81  duplicate(rhs);
82 }
83 
84 /* virtual */
85 ArrayJoinExistingAggregation::~ArrayJoinExistingAggregation()
86 {
87  cleanup();
88 }
89 
90 ArrayJoinExistingAggregation&
91 ArrayJoinExistingAggregation::operator=(const ArrayJoinExistingAggregation& rhs)
92 {
93  if (this != &rhs) {
94  cleanup();
95  ArrayAggregationBase::operator=(rhs);
96  duplicate(rhs);
97  }
98  return *this;
99 }
100 
101 /* virtual */
102 ArrayJoinExistingAggregation*
104 {
105  return new ArrayJoinExistingAggregation(*this);
106 }
107 
108 // Set this to 0 to get the old behavior where the entire response
109 // (for this variable) is built in memory and then sent to the client.
110 #define PIPELINING 1
111 
112 /* virtual */
113 // begin modifying here for the double buffering
114 // see notes about how this was written marked with '***'
115 // Following this method is an older version of serialize that
116 // provides no new functionality but does get run instead of the
117 // more general implementation in libdap::Array.
118 bool ArrayJoinExistingAggregation::serialize(libdap::ConstraintEvaluator &eval, libdap::DDS &dds, libdap::Marshaller &m,
119  bool ce_eval)
120 {
121  BESStopWatch sw;
122  if (BESISDEBUG(TIMING_LOG)) sw.start("ArrayJoinExistingAggregation::serialize", "");
123 
124  // *** This serialize() implementation was made by starting with a simple version that
125  // *** tested read_p(), calling read() if needed and tsting send_p() and is_in_selection(),
126  // *** returning true if the data did not need to be sent. I moved that test here.
127 
128  // Only continue if we are supposed to serialize this object at all.
129  if (!(send_p() || is_in_selection())) {
130  BESDEBUG_FUNC(DEBUG_CHANNEL, "Object not in output, skipping... name=" << name() << endl);
131  return true;
132  }
133 
134  // *** Add status so that we can do our magic _or_ pass off the call to libdap
135  // *** and collect the result either way.
136  bool status = false;
137 
138  if (!read_p()) {
139  // *** copy lines from AggregationBase::read() into here in place
140  // *** of the call to read()
141 
142  if (PRINT_CONSTRAINTS) {
143  BESDEBUG_FUNC(DEBUG_CHANNEL, "Constraints on this Array are:" << endl);
144  printConstraints(*this);
145  }
146 
147  // call subclass impl
149 
150  if (PRINT_CONSTRAINTS) {
151  BESDEBUG_FUNC(DEBUG_CHANNEL, "After transfer, constraints on the member template Array are: " << endl);
153  }
154 
155  // *** Inserted code from readConstrainedGranuleArraysAndAggregateDataHook here
156 
157  // outer one is the first in iteration
158  const Array::dimension& outerDim = *(dim_begin());
159  BESDEBUG("ncml",
160  "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
161 
162  try {
163 #if PIPELINING
164  // assumes the constraints are already set properly on this
165  m.put_vector_start(length());
166 #else
167  reserve_value_capacity();
168 #endif
169 
170  // Start the iteration state for the granule.
171  const AMDList& datasets = getDatasetList(); // the list
172  NCML_ASSERT(!datasets.empty());
173  int currDatasetIndex = 0; // index into datasets
174  const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
175 
176  int outerDimIndexOfCurrDatasetHead = 0;
177  int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
178  bool currDatasetWasRead = false;
179 
180  // where in this output array we are writing next
181  unsigned int nextOutputBufferElementIndex = 0;
182 
183  // Traverse the outer dimension constraints,
184  // Keeping track of which dataset we need to
185  // be inside for the given values of the constraint.
186  for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
187  outerDimIndex += outerDim.stride) {
188  // Figure out where the given outer index maps into in local granule space
189  int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
190 
191  // if this is beyond the dataset end, move state to the next dataset
192  // and try again until we're in the proper interval, with proper dataset.
193  while (localGranuleIndex >= currDatasetSize) {
194  localGranuleIndex -= currDatasetSize;
195  outerDimIndexOfCurrDatasetHead += currDatasetSize;
196  ++currDatasetIndex;
197  NCML_ASSERT(currDatasetIndex < int(datasets.size()));
198  pCurrDataset = datasets[currDatasetIndex].get();
199  currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
200  currDatasetWasRead = false;
201 
202  BESDEBUG_FUNC(DEBUG_CHANNEL,
203  "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
204  }
205 
206  // If we haven't read in this granule yet (we passed a boundary)
207  // then do it now. Map constraints into the local granule space.
208  if (!currDatasetWasRead) {
209  BESDEBUG_FUNC(DEBUG_CHANNEL,
210  " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
211 
212  // Set up a constraint object for the actual granule read
213  // so that it only loads the data values in which we are
214  // interested.
215  Array& granuleConstraintTemplate = getGranuleTemplateArray();
216 
217  // The inner dim constraints were set up in the containing read() call.
218  // The outer dim was left open for us to fix now...
219  Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
220 
221  // modify the outerdim size to match the dataset we need to
222  // load. The inners MUST match so we can let those get
223  //checked later...
224  outerDimIt->size = currDatasetSize;
225  outerDimIt->c_size = currDatasetSize; // this will get recalc below?
226 
227  // find the mapped endpoint
228  // Basically, the fullspace endpoint mapped to local offset,
229  // clamped into the local granule size.
230  int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead,
231  currDatasetSize - 1);
232 
233  // we must clamp the stride to the interval of the
234  // dataset in order to avoid an exception in
235  // add_constraint on stride being larger than dataset.
236  int clampedStride = std::min(outerDim.stride, currDatasetSize);
237  // mapped endpoint clamped within this granule
238  granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride,
239  granuleStopIndex);
240 #if USE_LOCAL_TIMEOUT_SCHEME
241  dds.timeout_on();
242 #endif
243 #if 0
244  // Do the constrained read and copy it into this output buffer
245  agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this,// into the output buffer of this object
246  nextOutputBufferElementIndex,// into the next open slice
247  getGranuleTemplateArray(),// constraints we just setup
248  name(),// aggvar name
249  const_cast<AggMemberDataset&>(*pCurrDataset),// Dataset who's DDS should be searched
250  getArrayGetterInterface(), DEBUG_CHANNEL);
251 #endif
252 
254  getGranuleTemplateArray(), name(), const_cast<AggMemberDataset&>(*pCurrDataset),
255  getArrayGetterInterface(), DEBUG_CHANNEL);
256 #if USE_LOCAL_TIMEOUT_SCHEME
257  dds.timeout_off();
258 #endif
259 
260 #if PIPELINING
261  m.put_vector_part(pDatasetArray->get_buf(), getGranuleTemplateArray().length(), var()->width(),
262  var()->type());
263 #else
264  this->set_value_slice_from_row_major_vector(*pDatasetArray, nextOutputBufferElementIndex);
265 #endif
266 
267  pDatasetArray->clear_local_data();
268 
269  // Jump output buffer index forward by the amount we added.
270  nextOutputBufferElementIndex += getGranuleTemplateArray().length();
271  currDatasetWasRead = true;
272 
273  BESDEBUG_FUNC(DEBUG_CHANNEL,
274  " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
275  } // !currDatasetWasRead
276  } // for loop over outerDim
277  } // end of try
278  catch (AggregationException& ex) {
279  THROW_NCML_PARSE_ERROR(-1, ex.what());
280  }
281 
282  // *** end of code inserted from readConstrainedGranuleArraysAndAggregateDataHook
283 
284 #if PIPELINING
285  m.put_vector_end();
286  status = true;
287 #else
288  set_read_p(true);
289  status = libdap::Array::serialize(eval, dds, m, ce_eval);
290 #endif
291  }
292  else {
293  status = libdap::Array::serialize(eval, dds, m, ce_eval);
294  }
295 
296  return status;
297 }
298 
300 // Private Impl Below
301 
302 void ArrayJoinExistingAggregation::duplicate(const ArrayJoinExistingAggregation& rhs)
303 {
304  _joinDim = rhs._joinDim;
305 }
306 
307 void ArrayJoinExistingAggregation::cleanup() throw ()
308 {
309 }
310 
311 /* virtual */
313 {
314  // transfer the constraints from this object into the subArray template
315  // skipping our first dim which is the join dim we need to do specially every
316  // granule in the read hook.
318  *this, // from this
319  true, // skip first dim in the copy since we handle it special
320  true, // also skip it in the toArray for the same reason.
321  true, // print debug
322  DEBUG_CHANNEL); // on this channel
323 }
324 
326 {
327  BESStopWatch sw;
328  if (BESISDEBUG(TIMING_LOG))
329  sw.start("ArrayJoinExistingAggregation::readConstrainedGranuleArraysAndAggregateDataHook", "");
330 
331  // outer one is the first in iteration
332  const Array::dimension& outerDim = *(dim_begin());
333  BESDEBUG("ncml",
334  "Aggregating datasets array with outer dimension constraints: " << " start=" << outerDim.start << " stride=" << outerDim.stride << " stop=" << outerDim.stop << endl);
335 
336  try {
337  // assumes the constraints are already set properly on this
338  reserve_value_capacity();
339 
340  // Start the iteration state for the granule.
341  const AMDList& datasets = getDatasetList(); // the list
342  NCML_ASSERT(!datasets.empty());
343  int currDatasetIndex = 0; // index into datasets
344  const AggMemberDataset* pCurrDataset = (datasets[currDatasetIndex]).get();
345 
346  int outerDimIndexOfCurrDatasetHead = 0;
347  int currDatasetSize = int(pCurrDataset->getCachedDimensionSize(_joinDim.name));
348  bool currDatasetWasRead = false;
349 
350  // where in this output array we are writing next
351  unsigned int nextOutputBufferElementIndex = 0;
352 
353  // Traverse the outer dimension constraints,
354  // Keeping track of which dataset we need to
355  // be inside for the given values of the constraint.
356  for (int outerDimIndex = outerDim.start; outerDimIndex <= outerDim.stop && outerDimIndex < outerDim.size;
357  outerDimIndex += outerDim.stride) {
358  // Figure out where the given outer index maps into in local granule space
359  int localGranuleIndex = outerDimIndex - outerDimIndexOfCurrDatasetHead;
360 
361  // if this is beyond the dataset end, move state to the next dataset
362  // and try again until we're in the proper interval, with proper dataset.
363  while (localGranuleIndex >= currDatasetSize) {
364  localGranuleIndex -= currDatasetSize;
365  outerDimIndexOfCurrDatasetHead += currDatasetSize;
366  ++currDatasetIndex;
367  NCML_ASSERT(currDatasetIndex < int(datasets.size()));
368  pCurrDataset = datasets[currDatasetIndex].get();
369  currDatasetSize = pCurrDataset->getCachedDimensionSize(_joinDim.name);
370  currDatasetWasRead = false;
371 
372  BESDEBUG_FUNC(DEBUG_CHANNEL,
373  "The constraint traversal passed a granule boundary " << "on the outer dimension and is stepping forward into " << "granule index=" << currDatasetIndex << endl);
374  }
375 
376  // If we haven't read in this granule yet (we passed a boundary)
377  // then do it now. Map constraints into the local granule space.
378  if (!currDatasetWasRead) {
379  BESDEBUG_FUNC(DEBUG_CHANNEL,
380  " Current granule dataset was traversed but not yet " "read and copied into output. Mapping constraints " "and calling read()..." << endl);
381 
382  // Set up a constraint object for the actual granule read
383  // so that it only loads the data values in which we are
384  // interested.
385  Array& granuleConstraintTemplate = getGranuleTemplateArray();
386 
387  // The inner dim constraints were set up in the containing read() call.
388  // The outer dim was left open for us to fix now...
389  Array::Dim_iter outerDimIt = granuleConstraintTemplate.dim_begin();
390 
391  // modify the outerdim size to match the dataset we need to
392  // load. The inners MUST match so we can let those get
393  //checked later...
394  outerDimIt->size = currDatasetSize;
395  outerDimIt->c_size = currDatasetSize; // this will get recalc below?
396 
397  // find the mapped endpoint
398  // Basically, the fullspace endpoint mapped to local offset,
399  // clamped into the local granule size.
400  int granuleStopIndex = std::min(outerDim.stop - outerDimIndexOfCurrDatasetHead, currDatasetSize - 1);
401 
402  // we must clamp the stride to the interval of the
403  // dataset in order to avoid an exception in
404  // add_constraint on stride being larger than dataset.
405  int clampedStride = std::min(outerDim.stride, currDatasetSize);
406  // mapped endpoint clamped within this granule
407  granuleConstraintTemplate.add_constraint(outerDimIt, localGranuleIndex, clampedStride, granuleStopIndex);
408 
409  // Do the constrained read and copy it into this output buffer
410  agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray(*this, // into the output buffer of this object
411  nextOutputBufferElementIndex, // into the next open slice
412  getGranuleTemplateArray(), // constraints we just setup
413  name(), // aggvar name
414  const_cast<AggMemberDataset&>(*pCurrDataset), // Dataset who's DDS should be searched
415  getArrayGetterInterface(), DEBUG_CHANNEL);
416 
417  // Jump output buffer index forward by the amount we added.
418  nextOutputBufferElementIndex += getGranuleTemplateArray().length();
419  currDatasetWasRead = true;
420 
421  BESDEBUG_FUNC(DEBUG_CHANNEL,
422  " The granule index " << currDatasetIndex << " was read with constraints and copied into the aggregation output." << endl);
423  } // !currDatasetWasRead
424  } // for loop over outerDim
425  } // try
426 
427  catch (AggregationException& ex) {
428  THROW_NCML_PARSE_ERROR(-1, ex.what());
429  }
430 
431 }
432 
433 } // namespace agg_util
agg_util::ArrayAggregationBase::printConstraints
void printConstraints(const Array &fromArray)
Definition: ArrayAggregationBase.cc:143
BESStopWatch::start
virtual bool start(std::string name)
Definition: BESStopWatch.cc:58
agg_util::ArrayJoinExistingAggregation
Definition: ArrayJoinExistingAggregation.h:44
agg_util::ArrayJoinExistingAggregation::ArrayJoinExistingAggregation
ArrayJoinExistingAggregation(const libdap::Array &granuleTemplate, const AMDList &memberDatasets, std::auto_ptr< ArrayGetterInterface > &arrayGetter, const Dimension &joinDim)
Definition: ArrayJoinExistingAggregation.cc:52
agg_util::Dimension
Definition: Dimension.h:49
agg_util::ArrayAggregationBase::getGranuleTemplateArray
libdap::Array & getGranuleTemplateArray()
Definition: ArrayAggregationBase.cc:151
agg_util::AggregationUtil::addDatasetArrayDataToAggregationOutputArray
static void addDatasetArrayDataToAggregationOutputArray(libdap::Array &oOutputArray, unsigned int atIndex, const libdap::Array &constrainedTemplateArray, const string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const string &debugChannel)
Definition: AggregationUtil.cc:920
agg_util::ArrayJoinExistingAggregation::readConstrainedGranuleArraysAndAggregateDataHook
virtual void readConstrainedGranuleArraysAndAggregateDataHook()
Definition: ArrayJoinExistingAggregation.cc:325
agg_util::AggregationUtil::transferArrayConstraints
static void transferArrayConstraints(libdap::Array *pToArray, const libdap::Array &fromArray, bool skipFirstFromDim, bool skipFirstToDim, bool printDebug=false, const std::string &debugChannel="agg_util")
Definition: AggregationUtil.cc:732
agg_util::AggregationUtil::printDimensions
static void printDimensions(std::ostream &os, const libdap::Array &fromArray)
Definition: AggregationUtil.cc:690
agg_util
Helper class for temporarily hijacking an existing dhi to load a DDX response for one particular file...
Definition: AggMemberDataset.cc:38
agg_util::AggregationUtil::printConstraints
static void printConstraints(std::ostream &os, const libdap::Array &fromArray)
Definition: AggregationUtil.cc:706
agg_util::ArrayAggregationBase
Definition: ArrayAggregationBase.h:49
agg_util::ArrayJoinExistingAggregation::transferOutputConstraintsIntoGranuleTemplateHook
virtual void transferOutputConstraintsIntoGranuleTemplateHook()
Definition: ArrayJoinExistingAggregation.cc:312
agg_util::ArrayAggregationBase::getArrayGetterInterface
const ArrayGetterInterface & getArrayGetterInterface() const
Definition: ArrayAggregationBase.cc:158
agg_util::AggMemberDataset::getCachedDimensionSize
virtual unsigned int getCachedDimensionSize(const std::string &dimName) const =0
BESStopWatch
Definition: BESStopWatch.h:55
agg_util::Dimension::toString
std::string toString() const
Definition: Dimension.cc:55
agg_util::AggregationException
Definition: AggregationException.h:41
agg_util::AggregationUtil::readDatasetArrayDataForAggregation
static libdap::Array * readDatasetArrayDataForAggregation(const libdap::Array &constrainedTemplateArray, const std::string &varName, AggMemberDataset &dataset, const ArrayGetterInterface &arrayGetter, const std::string &debugChannel)
Definition: AggregationUtil.cc:869
agg_util::ArrayJoinExistingAggregation::ptr_duplicate
virtual ArrayJoinExistingAggregation * ptr_duplicate()
Definition: ArrayJoinExistingAggregation.cc:103
agg_util::AggMemberDataset
Definition: AggMemberDataset.h:63
agg_util::ArrayAggregationBase::getDatasetList
const AMDList & getDatasetList() const
Definition: ArrayAggregationBase.cc:136