43 #include <D4EnumDefs.h>
44 #include <D4Attributes.h>
49 #include "BESInternalError.h"
52 #include "CurlHandlePool.h"
54 #include "DmrppArray.h"
55 #include "DmrppRequestHandler.h"
58 static const string dmrpp_3 =
"dmrpp:3";
59 static const string dmrpp_4 =
"dmrpp:4";
64 #define MB (1024*1024)
68 void DmrppArray::_duplicate(
const DmrppArray &)
72 DmrppArray::DmrppArray(
const string &n, BaseType *v) :
73 Array(n, v, true ), DmrppCommon()
77 DmrppArray::DmrppArray(
const string &n,
const string &d, BaseType *v) :
78 Array(n, d, v, true), DmrppCommon()
83 DmrppArray::ptr_duplicate()
85 return new DmrppArray(*
this);
88 DmrppArray::DmrppArray(
const DmrppArray &rhs) :
89 Array(rhs), DmrppCommon(rhs)
95 DmrppArray::operator=(
const DmrppArray &rhs)
97 if (
this == &rhs)
return *
this;
99 dynamic_cast<Array &
>(*this) = rhs;
102 DmrppCommon::m_duplicate_common(rhs);
111 bool DmrppArray::is_projected()
113 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
114 if (dimension_size(p,
true) != dimension_size(p,
false))
return true;
137 static unsigned long long get_index(
const vector<unsigned int> &address_in_target,
const vector<unsigned int> &target_shape)
139 assert(address_in_target.size() == target_shape.size());
141 vector<unsigned int>::const_reverse_iterator shape_index = target_shape.rbegin();
142 vector<unsigned int>::const_reverse_iterator index = address_in_target.rbegin(), index_end = address_in_target.rend();
144 unsigned long long multiplier = *shape_index++;
145 unsigned long long offset = *index++;
147 while (index != index_end) {
148 assert(*index < *shape_index);
150 offset += multiplier * *index++;
151 multiplier *= *shape_index++;
166 unsigned long long size = 1;
167 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
168 size *= dimension_size(dim, constrained);
181 Dim_iter dim = dim_begin(), edim = dim_end();
182 vector<unsigned int> shape;
186 shape.reserve(edim - dim);
188 for (; dim != edim; dim++) {
189 shape.push_back(dimension_size(dim, constrained));
200 DmrppArray::dimension DmrppArray::get_dimension(
unsigned int i)
202 assert(i <= (dim_end() - dim_begin()));
203 return *(dim_begin() + i);
211 void DmrppArray::insert_constrained_contiguous(Dim_iter dimIter,
unsigned long *target_index, vector<unsigned int> &subsetAddress,
212 const vector<unsigned int> &array_shape,
char *src_buf)
214 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() - subsetAddress.size(): " << subsetAddress.size() << endl);
216 unsigned int bytesPerElt = prototype()->width();
218 char *dest_buf = get_buf();
220 unsigned int start = this->dimension_start(dimIter,
true);
221 unsigned int stop = this->dimension_stop(dimIter,
true);
222 unsigned int stride = this->dimension_stride(dimIter,
true);
228 if (dimIter == dim_end() && stride == 1) {
230 subsetAddress.push_back(start);
231 unsigned long start_index = get_index(subsetAddress, array_shape);
232 subsetAddress.pop_back();
234 subsetAddress.push_back(stop);
235 unsigned long stop_index = get_index(subsetAddress, array_shape);
236 subsetAddress.pop_back();
240 for (
unsigned long sourceIndex = start_index; sourceIndex <= stop_index; sourceIndex++) {
241 unsigned long target_byte = *target_index * bytesPerElt;
242 unsigned long source_byte = sourceIndex * bytesPerElt;
244 for (
unsigned long i = 0; i < bytesPerElt; i++) {
245 dest_buf[target_byte++] = src_buf[source_byte++];
251 for (
unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
254 if (dimIter != dim_end()) {
257 subsetAddress.push_back(myDimIndex);
258 insert_constrained_contiguous(dimIter, target_index, subsetAddress, array_shape, src_buf);
259 subsetAddress.pop_back();
264 subsetAddress.push_back(myDimIndex);
265 unsigned int sourceIndex = get_index(subsetAddress, array_shape);
266 subsetAddress.pop_back();
269 unsigned long target_byte = *target_index * bytesPerElt;
270 unsigned long source_byte = sourceIndex * bytesPerElt;
272 for (
unsigned int i = 0; i < bytesPerElt; i++) {
273 dest_buf[target_byte++] = src_buf[source_byte++];
286 void *one_chunk_unconstrained_thread(
void *arg_list)
288 one_chunk_unconstrained_args *args =
reinterpret_cast<one_chunk_unconstrained_args*
>(arg_list);
291 process_one_chunk_unconstrained(args->chunk, args->array, args->array_shape, args->chunk_shape);
294 write(args->fds[1], &args->tid,
sizeof(args->tid));
296 pthread_exit(
new string(error.get_verbose_message()));
301 write(args->fds[1], &args->tid,
sizeof(args->tid));
315 void *one_child_chunk_thread(
void *arg_list)
317 one_child_chunk_args *args =
reinterpret_cast<one_child_chunk_args*
>(arg_list);
320 args->child_chunk->read_chunk();
322 assert(args->master_chunk->get_rbuf());
323 assert(args->child_chunk->get_rbuf());
324 assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
337 unsigned int offset_within_master_chunk = args->child_chunk->get_offset() - args->master_chunk->get_offset();
339 memcpy(args->master_chunk->get_rbuf() + offset_within_master_chunk, args->child_chunk->get_rbuf(), args->child_chunk->get_bytes_read());
342 write(args->fds[1], &args->tid,
sizeof(args->tid));
345 pthread_exit(
new string(error.get_verbose_message()));
350 write(args->fds[1], &args->tid,
sizeof(args->tid));
361 void DmrppArray::read_contiguous()
369 if (chunk_refs.size() != 1)
throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
372 Chunk &master_chunk = chunk_refs[0];
374 unsigned long long master_chunk_size = master_chunk.get_size();
379 if (DmrppRequestHandler::d_use_parallel_transfers && master_chunk_size > DmrppRequestHandler::d_min_size) {
383 master_chunk.set_rbuf_to_size();
388 unsigned int num_chunks = floor(master_chunk_size/MB);
389 if ( num_chunks >= DmrppRequestHandler::d_max_parallel_transfers)
390 num_chunks = DmrppRequestHandler::d_max_parallel_transfers;
394 int status = pipe(fds);
396 throw BESInternalError(
string(
"Could not open a pipe for thread communication: ").append(strerror(errno)), __FILE__, __LINE__);
399 unsigned long long chunk_size = master_chunk_size / num_chunks;
400 unsigned long long chunk_offset = master_chunk.get_offset();
404 unsigned int chunk_remainder = master_chunk.get_size() % num_chunks;
406 string chunk_url = master_chunk.get_data_url();
409 queue<Chunk *> chunks_to_read;
411 for (
unsigned int i = 0; i < num_chunks-1; i++) {
412 chunks_to_read.push(
new Chunk(chunk_url, chunk_size, (chunk_size * i) + chunk_offset));
415 chunks_to_read.push(
new Chunk(chunk_url, chunk_size + chunk_remainder, (chunk_size * (num_chunks-1)) + chunk_offset));
418 pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
419 memset(&threads[0], 0,
sizeof(pthread_t) * DmrppRequestHandler::d_max_parallel_transfers);
422 unsigned int num_threads = 0;
425 for (
unsigned int i = 0; i < (
unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
426 Chunk *current_chunk = chunks_to_read.front();
427 chunks_to_read.pop();
430 one_child_chunk_args *args =
new one_child_chunk_args(fds, i, current_chunk, &master_chunk);
431 int status = pthread_create(&threads[i], NULL, dmrpp::one_child_chunk_thread, (
void*) args);
435 BESDEBUG(dmrpp_3,
"started thread: " << i << endl);
438 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for master_chunk ", std::ios::ate);
439 oss << i <<
": " << strerror(status);
440 BESDEBUG(dmrpp_3, oss.str());
446 while (num_threads > 0) {
449 int bytes =
::read(fds[0], &tid,
sizeof(tid));
450 if (bytes !=
sizeof(tid))
451 throw BESInternalError(
string(
"Could not read the thread id: ").append(strerror(errno)), __FILE__, __LINE__);
453 if (!(tid < DmrppRequestHandler::d_max_parallel_transfers)) {
454 ostringstream oss(
"Invalid thread id read after thread exit: ", std::ios::ate);
460 int status = pthread_join(threads[tid], (
void**)&error);
462 BESDEBUG(dmrpp_3,
"joined thread: " << (
unsigned int)tid <<
", there are: " << num_threads << endl);
465 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for master_chunk ", std::ios::ate);
466 oss << tid <<
": " << strerror(status);
469 else if (error != 0) {
474 else if (chunks_to_read.size() > 0) {
475 Chunk *current_chunk = chunks_to_read.front();
476 chunks_to_read.pop();
479 one_child_chunk_args *args =
new one_child_chunk_args(fds, tid, current_chunk, &master_chunk);
480 int status = pthread_create(&threads[tid], NULL, dmrpp::one_child_chunk_thread, (
void*) args);
484 oss <<
"Could not start process_one_chunk_unconstrained thread for master_chunk " << tid <<
": " << strerror(status);
488 BESDEBUG(dmrpp_3,
"started thread: " << (
unsigned int)tid <<
", there are: " << threads << endl);
499 join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
509 master_chunk.read_chunk();
516 if (!is_projected()) {
517 val2buf(master_chunk.get_rbuf());
520 vector<unsigned int> array_shape =
get_shape(
false);
523 reserve_value_capacity(
get_size(
true));
524 unsigned long target_index = 0;
525 vector<unsigned int> subset;
527 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, master_chunk.get_rbuf());
545 unsigned long long DmrppArray::get_chunk_start(
const dimension &thisDim,
unsigned int chunk_origin)
548 unsigned long long first_element_offset = 0;
549 if ((
unsigned) (thisDim.start) < chunk_origin) {
551 if (thisDim.stride != 1) {
553 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
555 if (first_element_offset != 0) {
557 first_element_offset = thisDim.stride - first_element_offset;
562 first_element_offset = thisDim.start - chunk_origin;
565 return first_element_offset;
568 #ifdef USE_READ_SERIAL
590 void DmrppArray::insert_chunk_serial(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
593 BESDEBUG(
"dmrpp", __func__ <<
" dim: "<< dim <<
" BEGIN "<< endl);
596 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
599 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
601 dimension thisDim = this->get_dimension(dim);
604 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
609 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
612 if (first_element_offset > chunk_shape[dim]) {
617 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
618 if ((
unsigned) thisDim.stop < end_element) {
619 end_element = thisDim.stop;
622 unsigned long long chunk_start = first_element_offset;
623 unsigned long long chunk_end = end_element - chunk_origin[dim];
624 vector<unsigned int> constrained_array_shape =
get_shape(
true);
626 unsigned int last_dim = chunk_shape.size() - 1;
627 if (dim == last_dim) {
633 char *source_buffer = chunk->get_rbuf();
634 char *target_buffer = get_buf();
635 unsigned int elem_width = prototype()->width();
637 if (thisDim.stride == 1) {
639 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
641 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
644 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
646 (*chunk_element_address)[dim] = first_element_offset;
648 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
649 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
651 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
655 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
657 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
660 (*chunk_element_address)[dim] = chunk_index;
662 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
663 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
665 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
671 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
672 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
673 (*chunk_element_address)[dim] = chunk_index;
676 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
681 void DmrppArray::read_chunks_serial()
683 BESDEBUG(
"dmrpp", __func__ <<
" for variable '" << name() <<
"' - BEGIN" << endl);
686 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
689 reserve_value_capacity(
get_size(
true));
697 for (
unsigned long i = 0; i < chunk_refs.size(); i++) {
698 Chunk &chunk = chunk_refs[i];
700 vector<unsigned int> chunk_source_address(dimensions(), 0);
701 vector<unsigned int> target_element_address = chunk.get_position_in_array();
704 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
709 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() for " << name() <<
" END"<< endl);
735 DmrppArray::find_needed_chunks(
unsigned int dim, vector<unsigned int> *target_element_address, Chunk *chunk)
737 BESDEBUG(dmrpp_3, __func__ <<
" BEGIN, dim: " << dim << endl);
740 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
743 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
745 dimension thisDim = this->get_dimension(dim);
748 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
753 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
756 if (chunk_start > chunk_shape[dim]) {
761 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
762 if ((
unsigned) thisDim.stop < end_element) {
763 end_element = thisDim.stop;
766 unsigned long long chunk_end = end_element - chunk_origin[dim];
768 unsigned int last_dim = chunk_shape.size() - 1;
769 if (dim == last_dim) {
774 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
775 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
778 Chunk *needed = find_needed_chunks(dim + 1, target_element_address, chunk);
779 if (needed)
return needed;
805 void DmrppArray::insert_chunk(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
806 Chunk *chunk,
const vector<unsigned int> &constrained_array_shape)
809 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
812 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
814 dimension thisDim = this->get_dimension(dim);
817 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
820 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
821 if ((
unsigned) thisDim.stop < end_element) {
822 end_element = thisDim.stop;
825 unsigned long long chunk_end = end_element - chunk_origin[dim];
827 unsigned int last_dim = chunk_shape.size() - 1;
828 if (dim == last_dim) {
829 char *source_buffer = chunk->get_rbuf();
830 char *target_buffer = get_buf();
831 unsigned int elem_width = prototype()->width();
833 if (thisDim.stride == 1) {
835 unsigned long long start_element = chunk_origin[dim] + chunk_start;
837 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
840 (*target_element_address)[dim] = (start_element - thisDim.start);
842 (*chunk_element_address)[dim] = chunk_start;
845 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
846 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
848 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
852 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
854 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
857 (*chunk_element_address)[dim] = chunk_index;
860 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
861 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
863 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
869 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
870 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
871 (*chunk_element_address)[dim] = chunk_index;
874 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
885 void DmrppArray::read_chunks()
888 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
892 queue<Chunk *> chunks_to_read;
895 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
898 vector<unsigned int> target_element_address = chunk.get_position_in_array();
899 Chunk *needed = find_needed_chunks(0 , &target_element_address, &chunk);
900 if (needed) chunks_to_read.push(needed);
903 reserve_value_capacity(
get_size(
true));
904 vector<unsigned int> constrained_array_shape =
get_shape(
true);
906 BESDEBUG(dmrpp_3,
"d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
907 BESDEBUG(dmrpp_3,
"d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
909 if (DmrppRequestHandler::d_use_parallel_transfers) {
912 unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
913 dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
916 while (chunks_to_read.size() > 0) {
917 queue<Chunk*> chunks_to_insert;
918 for (
unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
919 Chunk *chunk = chunks_to_read.front();
920 chunks_to_read.pop();
922 chunk->set_rbuf_to_size();
923 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
924 if (!handle)
throw BESInternalError(
"No more libcurl handles.", __FILE__, __LINE__);
926 BESDEBUG(dmrpp_3,
"Queuing: " << chunk->to_string() << endl);
927 mhandle->add_easy_handle(handle);
929 chunks_to_insert.push(chunk);
932 mhandle->read_data();
934 while (chunks_to_insert.size() > 0) {
935 Chunk *chunk = chunks_to_insert.front();
936 chunks_to_insert.pop();
940 vector<unsigned int> target_element_address = chunk->get_position_in_array();
941 vector<unsigned int> chunk_source_address(dimensions(), 0);
943 BESDEBUG(dmrpp_3,
"Inserting: " << chunk->to_string() << endl);
944 insert_chunk(0 , &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
951 while (chunks_to_read.size() > 0) {
952 Chunk *chunk = chunks_to_read.front();
953 chunks_to_read.pop();
955 BESDEBUG(dmrpp_3,
"Reading: " << chunk->to_string() << endl);
960 vector<unsigned int> target_element_address = chunk->get_position_in_array();
961 vector<unsigned int> chunk_source_address(dimensions(), 0);
963 BESDEBUG(dmrpp_3,
"Inserting: " << chunk->to_string() << endl);
964 insert_chunk(0 , &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
984 static unsigned long multiplier(
const vector<unsigned int> &shape,
unsigned int k)
986 assert(shape.size() > 1);
987 assert(shape.size() > k + 1);
989 vector<unsigned int>::const_iterator i = shape.begin(), e = shape.end();
991 unsigned long multiplier = *i++;
1018 void DmrppArray::insert_chunk_unconstrained(Chunk *chunk,
unsigned int dim,
unsigned long long array_offset,
const vector<unsigned int> &array_shape,
1019 unsigned long long chunk_offset,
const vector<unsigned int> &chunk_shape,
const vector<unsigned int> &chunk_origin)
1024 dimension thisDim = this->get_dimension(dim);
1025 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1026 if ((
unsigned) thisDim.stop < end_element) {
1027 end_element = thisDim.stop;
1030 unsigned long long chunk_end = end_element - chunk_origin[dim];
1032 unsigned int last_dim = chunk_shape.size() - 1;
1033 if (dim == last_dim) {
1034 unsigned int elem_width = prototype()->width();
1036 array_offset += chunk_origin[dim];
1039 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
1040 char *source_buffer = chunk->get_rbuf();
1041 char *target_buffer = get_buf();
1042 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
1045 unsigned long mc = multiplier(chunk_shape, dim);
1046 unsigned long ma = multiplier(array_shape, dim);
1049 for (
unsigned int chunk_index = 0 ; chunk_index <= chunk_end; ++chunk_index) {
1050 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
1051 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
1054 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape, chunk_origin);
1067 void process_one_chunk_unconstrained(Chunk *chunk, DmrppArray *array,
const vector<unsigned int> &array_shape,
1068 const vector<unsigned int> &chunk_shape)
1070 chunk->read_chunk();
1072 if (array->is_deflate_compression() || array->is_shuffle_compression())
1073 chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(), array->get_chunk_size_in_elements(),
1074 array->var()->width());
1076 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
1079 void DmrppArray::read_chunks_unconstrained()
1082 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1084 reserve_value_capacity(
get_size());
1087 const vector<unsigned int> array_shape =
get_shape(
true);
1089 const vector<unsigned int> chunk_shape = get_chunk_dimension_sizes();
1091 BESDEBUG(dmrpp_3, __func__ << endl);
1093 BESDEBUG(dmrpp_3,
"d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
1094 BESDEBUG(dmrpp_3,
"d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
1096 if (DmrppRequestHandler::d_use_parallel_transfers) {
1097 queue<Chunk *> chunks_to_read;
1100 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c)
1101 chunks_to_read.push(&(*c));
1105 int status = pipe(fds);
1107 throw BESInternalError(
string(
"Could not open a pipe for thread communication: ").append(strerror(errno)), __FILE__, __LINE__);
1110 pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
1111 memset(&threads[0], 0,
sizeof(pthread_t) * DmrppRequestHandler::d_max_parallel_transfers);
1115 for (
unsigned int i = 0; i < (
unsigned int)DmrppRequestHandler::d_max_parallel_transfers; ++i) {
1116 memset(&threads[i], 0,
sizeof(pthread_t));
1122 unsigned int num_threads = 0;
1123 for (
unsigned int i = 0; i < (
unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
1124 Chunk *chunk = chunks_to_read.front();
1125 chunks_to_read.pop();
1128 one_chunk_unconstrained_args *args =
new one_chunk_unconstrained_args(fds, i, chunk,
this, array_shape, chunk_shape);
1129 int status = pthread_create(&threads[i], NULL, dmrpp::one_chunk_unconstrained_thread, (
void*) args);
1132 BESDEBUG(dmrpp_3,
"started thread: " << i << endl);
1135 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1136 oss << i <<
": " << strerror(status);
1137 BESDEBUG(dmrpp_3, oss.str());
1143 while (num_threads > 0) {
1146 int bytes =
::read(fds[0], &tid,
sizeof(tid));
1147 if (bytes !=
sizeof(tid))
1148 throw BESInternalError(
string(
"Could not read the thread id: ").append(strerror(errno)), __FILE__, __LINE__);
1150 if (!(tid < DmrppRequestHandler::d_max_parallel_transfers)) {
1151 ostringstream oss(
"Invalid thread id read after thread exit: ", std::ios::ate);
1157 int status = pthread_join(threads[tid], (
void**)&error);
1159 BESDEBUG(dmrpp_3,
"joined thread: " << (
unsigned int)tid <<
", there are: " << num_threads << endl);
1162 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1163 oss << tid <<
": " << strerror(status);
1166 else if (error != 0) {
1171 else if (chunks_to_read.size() > 0) {
1172 Chunk *chunk = chunks_to_read.front();
1173 chunks_to_read.pop();
1176 one_chunk_unconstrained_args *args =
new one_chunk_unconstrained_args(fds, tid, chunk,
this, array_shape, chunk_shape);
1177 int status = pthread_create(&threads[tid], NULL, dmrpp::one_chunk_unconstrained_thread, (
void*) args);
1180 oss <<
"Could not start process_one_chunk_unconstrained thread for chunk " << tid <<
": " << strerror(status);
1184 BESDEBUG(dmrpp_3,
"started thread: " << (
unsigned int)tid <<
", there are: " << threads << endl);
1195 join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
1204 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
1206 process_one_chunk_unconstrained(&chunk,
this, array_shape, chunk_shape);
1226 if (read_p())
return true;
1230 if (get_immutable_chunks().size() == 1 || get_chunk_dimension_sizes().empty()) {
1231 BESDEBUG(dmrpp_4,
"Calling read_contiguous() for " << name() << endl);
1235 if (!is_projected()) {
1236 BESDEBUG(dmrpp_4,
"Calling read_chunks_unconstrained() for " << name() << endl);
1237 read_chunks_unconstrained();
1240 BESDEBUG(dmrpp_4,
"Calling read_chunks() for " << name() << endl);
1251 class PrintD4ArrayDimXMLWriter:
public unary_function<Array::dimension&, void> {
1259 PrintD4ArrayDimXMLWriter(XMLWriter &xml,
bool c) :
1260 xml(xml), d_constrained(c)
1264 void operator()(Array::dimension &d)
1270 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar*)
"Dim") < 0)
1271 throw InternalErr(__FILE__, __LINE__,
"Could not write Dim element");
1273 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1276 if (!d_constrained && !name.empty()) {
1277 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*) name.c_str()) < 0)
1278 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1280 else if (d.use_sdim_for_slice) {
1281 assert(!name.empty());
1282 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*) name.c_str()) < 0)
1283 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1287 size << (d_constrained ? d.c_size : d.size);
1288 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"size", (
const xmlChar*) size.str().c_str()) < 0)
1289 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1292 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
throw InternalErr(__FILE__, __LINE__,
"Could not end Dim element");
1296 class PrintD4ConstructorVarXMLWriter:
public unary_function<BaseType*, void> {
1300 PrintD4ConstructorVarXMLWriter(XMLWriter &xml,
bool c) :
1301 xml(xml), d_constrained(c)
1305 void operator()(BaseType *btp)
1307 btp->print_dap4(xml, d_constrained);
1311 class PrintD4MapXMLWriter:
public unary_function<D4Map*, void> {
1315 PrintD4MapXMLWriter(XMLWriter &xml) :
1320 void operator()(D4Map *m)
1352 if (constrained && !send_p())
return;
1354 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar*) var()->type_name().c_str()) < 0)
1355 throw InternalErr(__FILE__, __LINE__,
"Could not write " + type_name() +
" element");
1357 if (!name().empty())
1358 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*) name().c_str()) < 0)
1359 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1362 if (var()->type() == dods_enum_c) {
1363 D4Enum *e =
static_cast<D4Enum*
>(var());
1364 string path = e->enumeration()->name();
1365 if (e->enumeration()->parent()) {
1367 path =
static_cast<D4Group*
>(e->enumeration()->parent()->parent())->FQN() + path;
1369 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"enum", (
const xmlChar*) path.c_str()) < 0)
1370 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for enum");
1373 if (prototype()->is_constructor_type()) {
1374 Constructor &c =
static_cast<Constructor&
>(*prototype());
1375 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1380 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1382 attributes()->print_dap4(xml);
1384 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1390 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
throw InternalErr(__FILE__, __LINE__,
"Could not end " + type_name() +
" element");
1393 void DmrppArray::dump(ostream & strm)
const
1395 strm << BESIndent::LMarg <<
"DmrppArray::" << __func__ <<
"(" << (
void *)
this <<
")" << endl;
1396 BESIndent::Indent();
1397 DmrppCommon::dump(strm);
1399 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
1400 BESIndent::UnIndent();