32 #include <BESInternalError.h>
35 #include "CurlHandlePool.h"
36 #include "DmrppRequestHandler.h"
37 #include "DmrppD4Opaque.h"
45 DmrppD4Opaque::_duplicate(
const DmrppD4Opaque &)
49 DmrppD4Opaque::DmrppD4Opaque(
const string &n) : D4Opaque(n), DmrppCommon()
53 DmrppD4Opaque::DmrppD4Opaque(
const string &n,
const string &d) : D4Opaque(n, d), DmrppCommon()
58 DmrppD4Opaque::ptr_duplicate()
60 return new DmrppD4Opaque(*
this);
63 DmrppD4Opaque::DmrppD4Opaque(
const DmrppD4Opaque &rhs) : D4Opaque(rhs), DmrppCommon(rhs)
69 DmrppD4Opaque::operator=(
const DmrppD4Opaque &rhs)
74 dynamic_cast<D4Opaque &
>(*this) = rhs;
77 DmrppCommon::m_duplicate_common(rhs);
82 void DmrppD4Opaque::insert_chunk(Chunk *chunk)
85 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
86 if (chunk_shape.size() != 1)
throw BESInternalError(
"Opaque variables' chunks can only have one dimension.", __FILE__, __LINE__);
89 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
91 char *source_buffer = chunk->get_rbuf();
92 unsigned char *target_buffer =
get_buf();
94 memcpy(target_buffer + chunk_origin[0], source_buffer, chunk_shape[0]);
97 void DmrppD4Opaque::read_chunks_parallel()
100 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
105 queue<Chunk*> chunks_to_read;
108 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
109 chunks_to_read.push(&*c);
114 if (DmrppRequestHandler::d_use_parallel_transfers) {
117 unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
118 dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
121 while (chunks_to_read.size() > 0) {
122 queue<Chunk*> chunks_to_insert;
123 for (
unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
124 Chunk *chunk = chunks_to_read.front();
125 chunks_to_read.pop();
127 chunk->set_rbuf_to_size();
128 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
129 if (!handle)
throw BESInternalError(
"No more libcurl handles.", __FILE__, __LINE__);
131 mhandle->add_easy_handle(handle);
133 chunks_to_insert.push(chunk);
136 mhandle->read_data();
138 while (chunks_to_insert.size() > 0) {
139 Chunk *chunk = chunks_to_insert.front();
140 chunks_to_insert.pop();
151 while (chunks_to_read.size() > 0) {
152 Chunk *chunk = chunks_to_read.front();
153 chunks_to_read.pop();
179 if (read_p())
return true;
184 if (get_chunk_dimension_sizes().empty()) {
192 read_chunks_parallel();
198 void DmrppD4Opaque::dump(ostream & strm)
const
200 strm << BESIndent::LMarg <<
"DmrppD4Opaque::dump - (" << (
void *)
this <<
")" << endl;
202 DmrppCommon::dump(strm);
203 D4Opaque::dump(strm);
204 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
205 BESIndent::UnIndent();