44#include "D4StreamMarshaller.h"
45#ifdef USE_POSIX_THREADS
46#include "MarshallerThread.h"
49#if USE_XDR_FOR_IEEE754_ENCODING
66inline uint8_t* WriteVarint64ToArrayInline(uint64_t value, uint8_t* target) {
69 uint32_t part0 =
static_cast<uint32_t
>(value );
70 uint32_t part1 =
static_cast<uint32_t
>(value >> 28);
71 uint32_t part2 =
static_cast<uint32_t
>(value >> 56);
84 if (part0 < (1 << 14)) {
85 if (part0 < (1 << 7)) {
91 if (part0 < (1 << 21)) {
98 if (part1 < (1 << 14)) {
99 if (part1 < (1 << 7)) {
100 size = 5;
goto size5;
102 size = 6;
goto size6;
105 if (part1 < (1 << 21)) {
106 size = 7;
goto size7;
108 size = 8;
goto size8;
113 if (part2 < (1 << 7)) {
114 size = 9;
goto size9;
116 size = 10;
goto size10;
122 size10: target[9] =
static_cast<uint8_t
>((part2 >> 7) | 0x80);
123 size9 : target[8] =
static_cast<uint8_t
>((part2 ) | 0x80);
124 size8 : target[7] =
static_cast<uint8_t
>((part1 >> 21) | 0x80);
125 size7 : target[6] =
static_cast<uint8_t
>((part1 >> 14) | 0x80);
126 size6 : target[5] =
static_cast<uint8_t
>((part1 >> 7) | 0x80);
127 size5 : target[4] =
static_cast<uint8_t
>((part1 ) | 0x80);
128 size4 : target[3] =
static_cast<uint8_t
>((part0 >> 21) | 0x80);
129 size3 : target[2] =
static_cast<uint8_t
>((part0 >> 14) | 0x80);
130 size2 : target[1] =
static_cast<uint8_t
>((part0 >> 7) | 0x80);
131 size1 : target[0] =
static_cast<uint8_t
>((part0 ) | 0x80);
133 target[size-1] &= 0x7F;
134 return target + size;
138#if USE_XDR_FOR_IEEE754_ENCODING
139void D4StreamMarshaller::m_serialize_reals(
char *val,
unsigned int num,
int width, Type type)
141 dods_uint64 size = num * width;
143 char *buf =
new char[size];
145 xdrmem_create(&xdr, buf.data(), size, XDR_ENCODE);
148 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
150 if (xdr_getpos(&xdr) != size)
151 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
157 dods_float32 *lbuf =
reinterpret_cast<dods_float32*
>(buf.data());
159 dods_int32 *i =
reinterpret_cast<dods_int32*
>(lbuf++);
164 dods_float64 *lbuf =
reinterpret_cast<dods_float64*
>(buf.data());
166 dods_int64 *i =
reinterpret_cast<dods_int64*
>(lbuf++);
171#ifdef USE_POSIX_THREADS
172 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
174 tm->increment_child_thread_count();
180 d_out.write(buf.data(), size);
201D4StreamMarshaller::D4StreamMarshaller(ostream &out,
bool write_data) :
202 d_out(out), d_write_data(write_data), tm(0)
204 assert(
sizeof(std::streamsize) >=
sizeof(int64_t));
206#if USE_XDR_FOR_IEEE754_ENCODING
210 xdrmem_create(&d_scalar_sink, d_ieee754_buf,
sizeof(dods_float64), XDR_ENCODE);
213#ifdef USE_POSIX_THREADS
219 out.exceptions(ostream::failbit | ostream::badbit);
222D4StreamMarshaller::~D4StreamMarshaller()
224#if USE_XDR_FOR_IEEE754_ENCODING
225 xdr_destroy(&d_scalar_sink);
251 oss.setf(ios::hex, ios::basefield);
252 oss << setfill(
'0') << setw(8) << d_checksum.
GetCrc32();
265 Crc32::checksum chk = d_checksum.
GetCrc32();
266#ifdef USE_POSIX_THREADS
267 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
269 d_out.write(
reinterpret_cast<char*
>(&chk),
sizeof(Crc32::checksum));
278 d_checksum.
AddData(
reinterpret_cast<const uint8_t*
>(data), len);
281void D4StreamMarshaller::put_byte(dods_byte val)
286 DBG( std::cerr <<
"put_byte: " << val << std::endl );
287#ifdef USE_POSIX_THREADS
289 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
291 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_byte));
295void D4StreamMarshaller::put_int8(dods_int8 val)
300 DBG( std::cerr <<
"put_int8: " << val << std::endl );
301#ifdef USE_POSIX_THREADS
302 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
304 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_int8));
308void D4StreamMarshaller::put_int16(dods_int16 val)
313#ifdef USE_POSIX_THREADS
314 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
316 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_int16));
320void D4StreamMarshaller::put_int32(dods_int32 val)
325#ifdef USE_POSIX_THREADS
326 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
328 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_int32));
332void D4StreamMarshaller::put_int64(dods_int64 val)
337#ifdef USE_POSIX_THREADS
338 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
340 d_out.write(
reinterpret_cast<const char*
>(&val),
sizeof(dods_int64));
344void D4StreamMarshaller::put_float32(dods_float32 val)
346#if !USE_XDR_FOR_IEEE754_ENCODING
347 assert(std::numeric_limits<float>::is_iec559);
352#ifdef USE_POSIX_THREADS
353 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
355 d_out.write(
reinterpret_cast<const char*
>(&val),
sizeof(dods_float32));
365 if (std::numeric_limits<float>::is_iec559 ) {
366#ifdef USE_POSIX_THREADS
367 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
369 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_float32));
372 if (!xdr_setpos(&d_scalar_sink, 0))
373 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
375 if (!xdr_float(&d_scalar_sink, &val))
376 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
378 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float32))
379 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
384 dods_int32 *i =
reinterpret_cast<dods_int32*
>(&d_ieee754_buf);
387#ifdef USE_POSIX_THREADS
388 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
390 d_out.write(d_ieee754_buf,
sizeof(dods_float32));
396void D4StreamMarshaller::put_float64(dods_float64 val)
398#if !USE_XDR_FOR_IEEE754_ENCODING
399 assert(std::numeric_limits<double>::is_iec559);
404#ifdef USE_POSIX_THREADS
405 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
407 d_out.write(
reinterpret_cast<const char*
>(&val),
sizeof(dods_float64));
413 if (std::numeric_limits<double>::is_iec559) {
414#ifdef USE_POSIX_THREADS
415 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
417 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_float64));}
420 if (!xdr_setpos(&d_scalar_sink, 0))
421 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
423 if (!xdr_double(&d_scalar_sink, &val))
424 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
426 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float64))
427 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
432 dods_int64 *i =
reinterpret_cast<dods_int64*
>(&d_ieee754_buf);
436#ifdef USE_POSIX_THREADS
437 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
439 d_out.write(d_ieee754_buf,
sizeof(dods_float64));
445void D4StreamMarshaller::put_uint16(dods_uint16 val)
447 checksum_update(&val,
sizeof(dods_uint16));
450#ifdef USE_POSIX_THREADS
451 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
453 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_uint16));
457void D4StreamMarshaller::put_uint32(dods_uint32 val)
459 checksum_update(&val,
sizeof(dods_uint32));
462#ifdef USE_POSIX_THREADS
463 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
465 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_uint32));
469void D4StreamMarshaller::put_uint64(dods_uint64 val)
471 checksum_update(&val,
sizeof(dods_uint64));
474#ifdef USE_POSIX_THREADS
475 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
477 d_out.write(
reinterpret_cast<char*
>(&val),
sizeof(dods_uint64));
489void D4StreamMarshaller::put_count(int64_t count)
491#ifdef USE_POSIX_THREADS
492 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
494 d_out.write(
reinterpret_cast<const char*
>(&count),
sizeof(int64_t));
497void D4StreamMarshaller::put_str(
const string &val)
499 checksum_update(val.c_str(), val.length());
502 int64_t len = val.length();
503#ifdef USE_POSIX_THREADS
504 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
506 d_out.write(
reinterpret_cast<const char*
>(&len),
sizeof(int64_t));
507 d_out.write(val.data(), val.length());
511void D4StreamMarshaller::put_url(
const string &val)
516void D4StreamMarshaller::put_opaque_dap4(
const char *val, int64_t len)
521 checksum_update(val, len);
524#ifdef USE_POSIX_THREADS
525 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
527 d_out.write(
reinterpret_cast<const char*
>(&len),
sizeof(int64_t));
529 char *byte_buf =
new char[len];
530 memcpy(byte_buf, val, len);
532 tm->increment_child_thread_count();
533 tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, len);
535 d_out.write(
reinterpret_cast<const char*
>(&len),
sizeof(int64_t));
536 d_out.write(val, len);
546void D4StreamMarshaller::put_vector(
char *val, int64_t num_bytes)
549 assert(num_bytes >= 0);
551 checksum_update(val, num_bytes);
554#ifdef USE_POSIX_THREADS
555 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
557 char *buf =
new char[num_bytes];
558 memcpy(buf, val, num_bytes);
560 tm->increment_child_thread_count();
561 tm->start_thread(MarshallerThread::write_thread, d_out, buf, num_bytes);
563 d_out.write(val, num_bytes);
568void D4StreamMarshaller::put_vector(
char *val, int64_t num_elem,
int elem_size)
571 assert(num_elem >= 0);
572 assert(elem_size > 0);
578 assert(!
"Don't call this method for bytes, use put_vector(val, bytes) instead");
583 assert(!(num_elem & 0x4000000000000000));
584 bytes = num_elem << 1;
587 assert(!(num_elem & 0x6000000000000000));
588 bytes = num_elem << 2;
591 assert(!(num_elem & 0x7000000000000000));
592 bytes = num_elem << 3;
595 bytes = num_elem * elem_size;
599 checksum_update(val, bytes);
602#ifdef USE_POSIX_THREADS
603 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
605 char *buf =
new char[bytes];
606 memcpy(buf, val, bytes);
608 tm->increment_child_thread_count();
609 tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
611 d_out.write(val, bytes);
625void D4StreamMarshaller::put_vector_float32(
char *val, int64_t num_elem)
627#if !USE_XDR_FOR_IEEE754_ENCODING
629 assert(std::numeric_limits<float>::is_iec559);
631 assert(num_elem >= 0);
636 assert(!(num_elem & 0xe000000000000000));
638 num_elem = num_elem << 2;
640 checksum_update(val, num_elem);
643#ifdef USE_POSIX_THREADS
644 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
646 char *buf =
new char[num_elem];
647 memcpy(buf, val, num_elem);
649 tm->increment_child_thread_count();
650 tm->start_thread(MarshallerThread::write_thread, d_out, buf, num_elem);
652 d_out.write(val, num_elem);
658 assert(num_elem >= 0);
663 assert(!(num_elem & 0xe000000000000000));
665 int64_t bytes = num_elem << 2;
667 checksum_update(val, bytes);
670 if (!std::numeric_limits<float>::is_iec559) {
672 m_serialize_reals(val, num_elem, 4, type);
675#ifdef USE_POSIX_THREADS
676 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
678 char *buf =
new char[bytes];
679 memcpy(buf, val, bytes);
681 tm->increment_child_thread_count();
682 tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
684 d_out.write(val, bytes);
699void D4StreamMarshaller::put_vector_float64(
char *val, int64_t num_elem)
701#if !USE_XDR_FOR_IEEE754_ENCODING
703 assert(std::numeric_limits<double>::is_iec559);
705 assert(num_elem >= 0);
707 assert(!(num_elem & 0xf000000000000000));
709 num_elem = num_elem << 3;
711 checksum_update(val, num_elem);
714#ifdef USE_POSIX_THREADS
715 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
717 char *buf =
new char[num_elem];
718 memcpy(buf, val, num_elem);
720 tm->increment_child_thread_count();
721 tm->start_thread(MarshallerThread::write_thread, d_out, buf, num_elem);
723 d_out.write(val, num_elem);
728 assert(num_elem >= 0);
733 assert(!(num_elem & 0xe000000000000000));
735 int64_t bytes = num_elem << 3;
737 checksum_update(val, bytes);
740 if (!std::numeric_limits<double>::is_iec559) {
742 m_serialize_reals(val, num_elem, 8, type);
745#ifdef USE_POSIX_THREADS
746 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
748 char *buf =
new char[bytes];
749 memcpy(buf, val, bytes);
751 tm->increment_child_thread_count();
752 tm->start_thread(MarshallerThread::write_thread, d_out, buf, bytes);
754 d_out.write(val, bytes);
762void D4StreamMarshaller::put_vector_part(
char *val,
unsigned int num,
int width,
Type type)
773 put_vector(val, num);
782 put_vector(val, num, width);
787 put_vector(val, num);
789 put_vector(val, num, width);
793 put_vector_float32(val, num);
797 put_vector_float32(val, num);
802 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
805 throw InternalErr(__FILE__, __LINE__,
"Array of Array not allowed.");
808 case dods_structure_c:
809 case dods_sequence_c:
810 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
813 throw InternalErr(__FILE__, __LINE__,
"Grid is not part of DAP4.");
816 throw InternalErr(__FILE__, __LINE__,
"Unknown datatype.");
820void D4StreamMarshaller::dump(ostream &strm)
const
822 strm << DapIndent::LMarg <<
"D4StreamMarshaller::dump - (" << (
void *)
this <<
")" << endl;
checksum GetCrc32() const
void AddData(const uint8_t *pData, const uint32_t length)
virtual void put_checksum()
Write the checksum Write the checksum for the data sent since the last call to reset_checksum() to th...
virtual void reset_checksum()
virtual void checksum_update(const void *data, unsigned long len)
virtual string get_checksum()
A class for software fault reporting.
static void * write_thread(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
top level DAP object to house generic methods
Type
Identifies the data type.
bool is_host_big_endian()
Does this host use big-endian byte order?