35 #include <curl/curl.h>
38 #include <curl/multi.h>
47 #include "BESInternalError.h"
48 #include "BESForbiddenError.h"
49 #include <TheBESKeys.h>
50 #include "WhiteList.h"
52 #include "DmrppRequestHandler.h"
53 #include "DmrppCommon.h"
55 #include "CurlHandlePool.h"
57 #include "CredentialsManager.h"
61 #define CURL_VERBOSE 0
63 static const int MAX_WAIT_MSECS = 30*1000;
64 static const unsigned int retry_limit = 10;
65 static const unsigned int initial_retry_time = 1000;
67 using namespace dmrpp;
71 #define MODULE "dmrpp:curl_handle_pool"
73 Lock::Lock(pthread_mutex_t &lock) : m_mutex(lock)
75 int status = pthread_mutex_lock(&m_mutex);
76 if (status != 0)
throw BESInternalError(
"Could not lock in CurlHandlePool", __FILE__, __LINE__);
81 int status = pthread_mutex_unlock(&m_mutex);
83 ERROR(
"Could not unlock in CurlHandlePool");
90 curl_error_msg(CURLcode res,
char *errbuf)
93 size_t len = strlen(errbuf);
96 oss <<
" (code: " << (
int)res <<
")";
99 oss << curl_easy_strerror(res) <<
"(result: " << res <<
")";
112 string dump(
const char *text,
unsigned char *ptr,
size_t size)
116 unsigned int width=0x10;
119 oss << text <<
", " << std::setw(10) << (long)size << std::setbase(16) << (long)size << endl;
121 for(i=0; i<size; i+= width) {
122 oss << std::setw(4) << (long)i;
126 for(c = 0; c < width; c++) {
128 oss << std::setw(2) << ptr[i+c];
138 for(c = 0; (c < width) && (i+c < size); c++) {
139 char x = (ptr[i+c] >= 0x20 && ptr[i+c] < 0x80) ? ptr[i+c] :
'.';
141 oss << std::setw(1) << x;
159 int curl_trace(CURL *, curl_infotype type,
char *data,
size_t ,
void *)
165 case CURLINFO_HEADER_OUT:
166 case CURLINFO_HEADER_IN: {
169 while ((pos = text.find(
'\n')) != string::npos)
170 text = text.substr(0, pos);
175 case CURLINFO_DATA_OUT:
176 case CURLINFO_SSL_DATA_OUT:
177 case CURLINFO_DATA_IN:
178 case CURLINFO_SSL_DATA_IN:
186 LOG(
"libcurl == Info: " << text << endl);
189 case CURLINFO_HEADER_OUT:
190 LOG(
"libcurl == Send header: " << text << endl);
192 case CURLINFO_HEADER_IN:
193 LOG(
"libcurl == Recv header: " << text << endl);
197 case CURLINFO_DATA_OUT:
198 LOG(
"libcurl == Send data" << text << endl);
200 case CURLINFO_SSL_DATA_OUT:
201 LOG(
"libcurl == Send SSL data" << text << endl);
203 case CURLINFO_DATA_IN:
204 LOG(
"libcurl == Recv data" << text << endl);
206 case CURLINFO_SSL_DATA_IN:
207 LOG(
"libcurl == Recv SSL data" << text << endl);
220 d_handle = curl_easy_init();
221 if (!d_handle)
throw BESInternalError(
"Could not allocate CURL handle", __FILE__, __LINE__);
225 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_ERRORBUFFER, d_errbuf)))
226 throw BESInternalError(
string(
"CURL Error: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
229 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_DEBUGFUNCTION, curl_trace)))
230 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
233 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_VERBOSE, 1L)))
234 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
238 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_WRITEFUNCTION, chunk_write_data)))
239 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
241 #ifdef CURLOPT_TCP_KEEPALIVE
243 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPALIVE, 1L)))
244 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
247 #ifdef CURLOPT_TCP_KEEPIDLE
249 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPIDLE, 120L)))
250 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
253 #ifdef CURLOPT_TCP_KEEPINTVL
255 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPINTVL, 120L)))
256 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__)
264 dmrpp_easy_handle::~dmrpp_easy_handle()
266 if (d_handle) curl_easy_cleanup(d_handle);
267 if (d_headers) curl_slist_free_all(d_headers);
278 static bool evaluate_curl_response(CURL* eh)
281 CURLcode res = curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_code);
282 if (CURLE_OK != res) {
283 throw BESInternalError(
string(
"Error getting HTTP response code: ").append(curl_error_msg(res, (
char*)
"")), __FILE__, __LINE__);
301 oss <<
"HTTP status error: Expected an OK status, but got: ";
316 if (d_url.find(
"https://") == 0 || d_url.find(
"http://") == 0) {
317 unsigned int tries = 0;
319 unsigned int retry_time = initial_retry_time;
323 CURLcode curl_code = curl_easy_perform(d_handle);
326 if (CURLE_OK != curl_code) {
327 throw BESInternalError(
string(
"Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
331 success = evaluate_curl_response(d_handle);
334 if (tries == retry_limit) {
336 string(
"Data transfer error: Number of re-tries to S3 exceeded: ").append(
337 curl_error_msg(curl_code, d_errbuf)), __FILE__, __LINE__);
340 LOG(
"HTTP transfer 500 error, will retry (trial " << tries <<
" for: " << d_url <<
").");
346 curl_slist_free_all(d_headers);
351 CURLcode curl_code = curl_easy_perform(d_handle);
352 if (CURLE_OK != curl_code) {
353 throw BESInternalError(
string(
"Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
358 d_chunk->set_is_read(
true);
368 struct dmrpp_multi_handle::multi_handle {
369 #if HAVE_CURL_MULTI_API
372 std::vector<dmrpp_easy_handle *> ehandles;
376 dmrpp_multi_handle::dmrpp_multi_handle()
378 p_impl =
new multi_handle;
379 #if HAVE_CURL_MULTI_API
380 p_impl->curlm = curl_multi_init();
384 dmrpp_multi_handle::~dmrpp_multi_handle()
386 #if HAVE_CURL_MULTI_API
387 curl_multi_cleanup(p_impl->curlm);
402 #if HAVE_CURL_MULTI_API
403 curl_multi_add_handle(p_impl->curlm, eh->d_handle);
405 p_impl->ehandles.push_back(eh);
411 #if !HAVE_CURL_MULTI_API
412 static void *easy_handle_read_data(
void *handle)
421 string *error =
new string(e.get_verbose_message());
441 #if HAVE_CURL_MULTI_API
444 int still_running = 0;
445 CURLMcode mres = curl_multi_perform(p_impl->curlm, &still_running);
446 if (mres != CURLM_OK)
447 throw BESInternalError(
string(
"Could not initiate data read: ").append(curl_multi_strerror(mres)), __FILE__,
452 mres = curl_multi_wait(p_impl->curlm, NULL, 0, MAX_WAIT_MSECS, &numfds);
453 if (mres != CURLM_OK)
454 throw BESInternalError(
string(
"Could not wait on data read: ").append(curl_multi_strerror(mres)), __FILE__,
457 mres = curl_multi_perform(p_impl->curlm, &still_running);
458 if (mres != CURLM_OK)
459 throw BESInternalError(
string(
"Could not iterate data read: ").append(curl_multi_strerror(mres)), __FILE__,
462 }
while (still_running);
466 while ((msg = curl_multi_info_read(p_impl->curlm, &msgs_left))) {
467 if (msg->msg == CURLMSG_DONE) {
468 CURL *eh = msg->easy_handle;
470 CURLcode res = msg->data.result;
472 throw BESInternalError(
string(
"Error HTTP: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
480 throw BESInternalError(
string(
"Could not access easy handle: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
486 evaluate_curl_response(eh);
495 mres = curl_multi_remove_handle(p_impl->curlm, eh);
496 if (mres != CURLM_OK)
497 throw BESInternalError(
string(
"Could not remove libcurl handle: ").append(curl_multi_strerror(mres)), __FILE__, __LINE__);
502 throw BESInternalError(
"Error getting HTTP or FILE responses.", __FILE__, __LINE__);
508 pthread_t threads[p_impl->ehandles.size()];
509 unsigned int num_threads = 0;
511 for (
unsigned int i = 0; i < p_impl->ehandles.size(); ++i) {
512 int status = pthread_create(&threads[i], NULL, easy_handle_read_data, (
void*) p_impl->ehandles[i]);
517 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
518 oss << i <<
": " << strerror(status);
524 for (
unsigned int i = 0; i < num_threads; ++i) {
526 int status = pthread_join(threads[i], (
void**) &error);
528 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
529 oss << i <<
": " << strerror(status);
532 else if (error != 0) {
540 join_threads(threads, num_threads);
545 p_impl->ehandles.clear();
549 CurlHandlePool::CurlHandlePool() : d_multi_handle(0)
551 d_max_easy_handles = DmrppRequestHandler::d_max_parallel_transfers;
554 for (
unsigned int i = 0; i < d_max_easy_handles; ++i) {
558 if (pthread_mutex_init(&d_get_easy_handle_mutex, 0) != 0)
559 throw BESInternalError(
"Could not initialize mutex in CurlHandlePool", __FILE__, __LINE__);
573 static struct curl_slist *
574 append_http_header(curl_slist *slist,
const string &header,
const string &value)
576 string full_header = header;
577 full_header.append(
" ").append(value);
579 struct curl_slist *temp = curl_slist_append(slist, full_header.c_str());
586 url_has_credentials(
const string &url)
588 return (url.find(
"cloudyopendap") != string::npos);
592 url_must_be_signed(
const string &url)
595 if(url.find(
"http://") == 0 || url.find(
"https://") == 0){
607 struct aws_credentials {
613 map<string,map<string,string>> credentials;
616 aws_credentials(): public_key(
""), secret_key(
""), region(
""), bucket_name(
"") {}
618 aws_credentials(
const string &p_key,
const string &s_key,
const string &r,
const string &b)
619 : public_key(p_key), secret_key(s_key), region(r), bucket_name(b) {}
621 aws_credentials(
const aws_credentials &rhs)
622 : public_key(rhs.public_key), secret_key(rhs.secret_key), region(rhs.region), bucket_name(rhs.bucket_name) {}
624 unique_ptr<aws_credentials> get(
const string &url);
627 void get_from_env(
const string &key,
string &value){
628 const char *cstr = getenv(key.c_str());
631 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" From system environment - " << key <<
": " << value << endl);
638 void get_from_config(
const string &key,
string &value){
639 bool key_found=
false;
642 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << key <<
" from TheBESKeys" << endl);
649 void get_creds_from_local(
string &aws_akid,
string &aws_sak,
string &aws_region,
string &aws_s3_bucket ){
651 const string KEYS_CONFIG_PREFIX(
"DMRPP");
653 const string ENV_AKID_KEY(
"AWS_ACCESS_KEY_ID");
654 const string CONFIG_AKID_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_AKID_KEY);
656 const string ENV_SAK_KEY(
"AWS_SECRET_ACCESS_KEY");
657 const string CONFIG_SAK_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_SAK_KEY);
659 const string ENV_REGION_KEY(
"AWS_REGION");
660 const string CONFIG_REGION_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_REGION_KEY);
662 const string ENV_S3_BUCKET_KEY(
"AWS_S3_BUCKET");
663 const string CONFIG_S3_BUCKET_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_S3_BUCKET_KEY);
670 get_from_env(ENV_AKID_KEY,aws_akid);
671 get_from_env(ENV_SAK_KEY,aws_sak);
672 get_from_env(ENV_REGION_KEY,aws_region);
673 get_from_env(ENV_S3_BUCKET_KEY,aws_s3_bucket);
675 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__
676 <<
" From ENV aws_akid: '" << aws_akid <<
"' "
677 <<
"aws_sak: '" << aws_sak <<
"' "
678 <<
"aws_region: '" << aws_region <<
"' "
679 <<
"aws_s3_bucket: '" << aws_s3_bucket <<
"' "
688 if(aws_akid.length()){
689 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_AKID_KEY <<
" from the environment." << endl);
692 get_from_config(CONFIG_AKID_KEY,aws_akid);
695 if(aws_sak.length()){
696 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_SAK_KEY <<
" from the environment." << endl);
699 get_from_config(CONFIG_SAK_KEY,aws_sak);
702 if(aws_region.length()){
703 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_REGION_KEY <<
" from the environment." << endl);
706 get_from_config(CONFIG_REGION_KEY,aws_region);
709 if(aws_s3_bucket.length()){
710 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_S3_BUCKET_KEY <<
" from the environment." << endl);
713 get_from_config(CONFIG_S3_BUCKET_KEY,aws_s3_bucket);
716 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__
717 <<
" END aws_akid: '" << aws_akid <<
"' "
718 <<
"aws_sak: '" << aws_sak <<
"' "
719 <<
"aws_region: '" << aws_region <<
"' "
720 <<
"aws_s3_bucket: '" << aws_s3_bucket <<
"' "
724 unique_ptr<aws_credentials>
725 aws_credentials::get(
const string &url)
732 string aws_s3_bucket;
734 if (url.find(
"cloudyopendap") != string::npos) {
736 get_creds_from_local(aws_akid, aws_sak, aws_region, aws_s3_bucket);
737 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__
738 <<
" aws_akid: " << aws_akid
739 <<
" aws_sak: " << aws_sak
740 <<
" aws_region: " << aws_region
741 <<
" aws_s3_bucket: " << aws_s3_bucket
744 unique_ptr<aws_credentials> creds(
new aws_credentials(aws_akid, aws_sak, aws_region, aws_s3_bucket));
747 unique_ptr<aws_credentials> creds(
new aws_credentials(
"",
"",
"",
""));
772 if(!WhiteList::get_white_list()->is_white_listed(chunk->
get_data_url())){
773 string msg =
"ERROR!! The chunk url " + chunk->
get_data_url() +
" does not match any white-list rule. ";
777 Lock lock(d_get_easy_handle_mutex);
780 for (vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
781 if (!(*i)->d_in_use) {
789 handle->d_in_use =
true;
792 handle->d_chunk = chunk;
794 CURLcode res = curl_easy_setopt(handle->d_handle, CURLOPT_URL, chunk->
get_data_url().c_str());
795 if (res != CURLE_OK)
throw BESInternalError(
string(
"HTTP Error setting URL: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
799 throw BESInternalError(
string(
"HTTP Error setting Range: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
803 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_WRITEDATA,
reinterpret_cast<void*
>(chunk))))
804 throw BESInternalError(
string(
"CURL Error setting chunk as data buffer: ").append(curl_error_msg(res, handle->d_errbuf)),
808 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_PRIVATE,
reinterpret_cast<void*
>(handle))))
809 throw BESInternalError(
string(
"CURL Error setting easy_handle as private data: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
813 if ( credentials && credentials->
isS3Cred()) {
814 BESDEBUG(MODULE,
"Got AccessCredentials instance: "<< endl << credentials->to_json() << endl );
817 const std::time_t request_time = std::time(0);
819 const std::string auth_header =
820 AWSV4::compute_awsv4_signature(
823 credentials->
get(AccessCredentials::ID_KEY),
824 credentials->
get(AccessCredentials::KEY_KEY),
825 credentials->
get(AccessCredentials::REGION_KEY),
832 handle->d_headers = append_http_header(0,
"Authorization:", auth_header);
833 if (!handle->d_headers)
835 string(
"CURL Error setting Authorization header: ").append(
836 curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
839 curl_slist *temp = append_http_header(handle->d_headers,
"x-amz-content-sha256:",
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
842 string(
"CURL Error setting x-amz-content-sha256: ").append(curl_error_msg(res, handle->d_errbuf)),
844 handle->d_headers = temp;
846 temp = append_http_header(handle->d_headers,
"x-amz-date:", AWSV4::ISO8601_date(request_time));
849 string(
"CURL Error setting x-amz-date header: ").append(curl_error_msg(res, handle->d_errbuf)),
851 handle->d_headers = temp;
854 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_HTTPHEADER, handle->d_headers)))
855 throw BESInternalError(
string(
"CURL Error setting HTTP headers for S3 authentication: ").append(
856 curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
878 Lock lock(d_get_easy_handle_mutex);
883 handle->d_in_use =
false;
887 for (std::vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
889 BESDEBUG(
"dmrpp:5",
"Found a handle match for the " << i - d_easy_handles.begin() <<
"th easy handle." << endl);
std::string get(const std::string &key)
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Abstract exception class for the BES with basic string message.
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
AccessCredentials * get(const std::string &url)
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
static TheBESKeys * TheKeys()
virtual std::string get_curl_range_arg_string()
Returns a curl range argument. The libcurl requires a string argument for range-ge activitys,...
virtual std::string get_data_url() const
Get the data url string for this Chunk's data block.
void release_handle(dmrpp_easy_handle *h)
dmrpp_easy_handle * get_easy_handle(Chunk *chunk)
Bundle a libcurl easy handle to other information.
void read_data()
This is the read_data() method for serial transfers.
dmrpp_easy_handle()
Build a string with hex info about stuff libcurl gets.
Encapsulate a libcurl multi handle.
void add_easy_handle(dmrpp_easy_handle *eh)
Add an Easy Handle to a Multi Handle object.
void read_data()
The read_data() method for parallel transfers.