35 #include <curl/curl.h>
38 #include <curl/multi.h>
47 #include "BESInternalError.h"
48 #include "BESForbiddenError.h"
49 #include <TheBESKeys.h>
50 #include "WhiteList.h"
52 #include "DmrppRequestHandler.h"
53 #include "DmrppCommon.h"
55 #include "CurlHandlePool.h"
57 #include "CredentialsManager.h"
59 #define KEEP_ALIVE 1 // Reuse libcurl easy handles (1) or not (0).
61 #define CURL_VERBOSE 0 // Logs curl info to the bes.log
63 static const int MAX_WAIT_MSECS = 30*1000;
64 static const unsigned int retry_limit = 10;
65 static const unsigned int initial_retry_time = 1000;
67 using namespace dmrpp;
71 #define MODULE "dmrpp:curl_handle_pool"
73 Lock::Lock(pthread_mutex_t &lock) : m_mutex(lock)
75 int status = pthread_mutex_lock(&m_mutex);
76 if (status != 0)
throw BESInternalError(
"Could not lock in CurlHandlePool", __FILE__, __LINE__);
81 int status = pthread_mutex_unlock(&m_mutex);
83 ERROR(
"Could not unlock in CurlHandlePool");
90 curl_error_msg(CURLcode res,
char *errbuf)
93 size_t len = strlen(errbuf);
96 oss <<
" (code: " << (int)res <<
")";
99 oss << curl_easy_strerror(res) <<
"(result: " << res <<
")";
112 string dump(
const char *text,
unsigned char *ptr,
size_t size)
116 unsigned int width=0x10;
119 oss << text <<
", " << std::setw(10) << (long)size << std::setbase(16) << (long)size << endl;
121 for(i=0; i<size; i+= width) {
122 oss << std::setw(4) << (long)i;
126 for(c = 0; c < width; c++) {
128 oss << std::setw(2) << ptr[i+c];
138 for(c = 0; (c < width) && (i+c < size); c++) {
139 char x = (ptr[i+c] >= 0x20 && ptr[i+c] < 0x80) ? ptr[i+c] :
'.';
141 oss << std::setw(1) << x;
159 int curl_trace(CURL *, curl_infotype type,
char *data,
size_t ,
void *)
165 case CURLINFO_HEADER_OUT:
166 case CURLINFO_HEADER_IN: {
169 while ((pos = text.find(
'\n')) != string::npos)
170 text = text.substr(0, pos);
175 case CURLINFO_DATA_OUT:
176 case CURLINFO_SSL_DATA_OUT:
177 case CURLINFO_DATA_IN:
178 case CURLINFO_SSL_DATA_IN:
186 LOG(
"libcurl == Info: " << text << endl);
189 case CURLINFO_HEADER_OUT:
190 LOG(
"libcurl == Send header: " << text << endl);
192 case CURLINFO_HEADER_IN:
193 LOG(
"libcurl == Recv header: " << text << endl);
197 case CURLINFO_DATA_OUT:
198 LOG(
"libcurl == Send data" << text << endl);
200 case CURLINFO_SSL_DATA_OUT:
201 LOG(
"libcurl == Send SSL data" << text << endl);
203 case CURLINFO_DATA_IN:
204 LOG(
"libcurl == Recv data" << text << endl);
206 case CURLINFO_SSL_DATA_IN:
207 LOG(
"libcurl == Recv SSL data" << text << endl);
220 d_handle = curl_easy_init();
221 if (!d_handle)
throw BESInternalError(
"Could not allocate CURL handle", __FILE__, __LINE__);
225 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_ERRORBUFFER, d_errbuf)))
226 throw BESInternalError(
string(
"CURL Error: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
229 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_DEBUGFUNCTION, curl_trace)))
230 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
233 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_VERBOSE, 1L)))
234 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
238 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_WRITEFUNCTION, chunk_write_data)))
239 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res, d_errbuf)), __FILE__, __LINE__);
241 #ifdef CURLOPT_TCP_KEEPALIVE
243 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPALIVE, 1L)))
244 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
247 #ifdef CURLOPT_TCP_KEEPIDLE
249 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPIDLE, 120L)))
250 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__);
253 #ifdef CURLOPT_TCP_KEEPINTVL
255 if (CURLE_OK != (res = curl_easy_setopt(d_handle, CURLOPT_TCP_KEEPINTVL, 120L)))
256 throw BESInternalError(
string(
"CURL Error: ").append(curl_error_msg(res)), __FILE__, __LINE__)
264 dmrpp_easy_handle::~dmrpp_easy_handle()
266 if (d_handle) curl_easy_cleanup(d_handle);
267 if (d_headers) curl_slist_free_all(d_headers);
278 static bool evaluate_curl_response(CURL* eh)
281 CURLcode res = curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_code);
282 if (CURLE_OK != res) {
283 throw BESInternalError(
string(
"Error getting HTTP response code: ").append(curl_error_msg(res, (
char*)
"")), __FILE__, __LINE__);
301 oss <<
"HTTP status error: Expected an OK status, but got: ";
316 if (d_url.find(
"https://") == 0 || d_url.find(
"http://") == 0) {
317 unsigned int tries = 0;
319 unsigned int retry_time = initial_retry_time;
323 CURLcode curl_code = curl_easy_perform(d_handle);
326 if (CURLE_OK != curl_code) {
327 throw BESInternalError(
string(
"Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
331 success = evaluate_curl_response(d_handle);
334 if (tries == retry_limit) {
336 string(
"Data transfer error: Number of re-tries to S3 exceeded: ").append(
337 curl_error_msg(curl_code, d_errbuf)), __FILE__, __LINE__);
340 LOG(
"HTTP transfer 500 error, will retry (trial " << tries <<
" for: " << d_url <<
").");
346 curl_slist_free_all(d_headers);
351 CURLcode curl_code = curl_easy_perform(d_handle);
352 if (CURLE_OK != curl_code) {
353 throw BESInternalError(
string(
"Data transfer error: ").append(curl_error_msg(curl_code, d_errbuf)),
358 d_chunk->set_is_read(
true);
368 struct dmrpp_multi_handle::multi_handle {
369 #if HAVE_CURL_MULTI_API
372 std::vector<dmrpp_easy_handle *> ehandles;
376 dmrpp_multi_handle::dmrpp_multi_handle()
378 p_impl =
new multi_handle;
379 #if HAVE_CURL_MULTI_API
380 p_impl->curlm = curl_multi_init();
384 dmrpp_multi_handle::~dmrpp_multi_handle()
386 #if HAVE_CURL_MULTI_API
387 curl_multi_cleanup(p_impl->curlm);
402 #if HAVE_CURL_MULTI_API
403 curl_multi_add_handle(p_impl->curlm, eh->d_handle);
405 p_impl->ehandles.push_back(eh);
411 #if !HAVE_CURL_MULTI_API
412 static void *easy_handle_read_data(
void *handle)
421 string *error =
new string(e.get_verbose_message());
441 #if HAVE_CURL_MULTI_API
444 int still_running = 0;
445 CURLMcode mres = curl_multi_perform(p_impl->curlm, &still_running);
446 if (mres != CURLM_OK)
447 throw BESInternalError(
string(
"Could not initiate data read: ").append(curl_multi_strerror(mres)), __FILE__,
452 mres = curl_multi_wait(p_impl->curlm, NULL, 0, MAX_WAIT_MSECS, &numfds);
453 if (mres != CURLM_OK)
454 throw BESInternalError(
string(
"Could not wait on data read: ").append(curl_multi_strerror(mres)), __FILE__,
457 mres = curl_multi_perform(p_impl->curlm, &still_running);
458 if (mres != CURLM_OK)
459 throw BESInternalError(
string(
"Could not iterate data read: ").append(curl_multi_strerror(mres)), __FILE__,
462 }
while (still_running);
466 while ((msg = curl_multi_info_read(p_impl->curlm, &msgs_left))) {
467 if (msg->msg == CURLMSG_DONE) {
468 CURL *eh = msg->easy_handle;
470 CURLcode res = msg->data.result;
472 throw BESInternalError(
string(
"Error HTTP: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
480 throw BESInternalError(
string(
"Could not access easy handle: ").append(curl_easy_strerror(res)), __FILE__, __LINE__);
486 evaluate_curl_response(eh);
495 mres = curl_multi_remove_handle(p_impl->curlm, eh);
496 if (mres != CURLM_OK)
497 throw BESInternalError(
string(
"Could not remove libcurl handle: ").append(curl_multi_strerror(mres)), __FILE__, __LINE__);
502 throw BESInternalError(
"Error getting HTTP or FILE responses.", __FILE__, __LINE__);
508 pthread_t threads[p_impl->ehandles.size()];
509 unsigned int num_threads = 0;
511 for (
unsigned int i = 0; i < p_impl->ehandles.size(); ++i) {
512 int status = pthread_create(&threads[i], NULL, easy_handle_read_data, (
void*) p_impl->ehandles[i]);
517 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
518 oss << i <<
": " << strerror(status);
524 for (
unsigned int i = 0; i < num_threads; ++i) {
526 int status = pthread_join(threads[i], (
void**) &error);
528 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
529 oss << i <<
": " << strerror(status);
532 else if (error != 0) {
540 join_threads(threads, num_threads);
545 p_impl->ehandles.clear();
549 CurlHandlePool::CurlHandlePool() : d_multi_handle(0)
551 d_max_easy_handles = DmrppRequestHandler::d_max_parallel_transfers;
554 for (
unsigned int i = 0; i < d_max_easy_handles; ++i) {
558 if (pthread_mutex_init(&d_get_easy_handle_mutex, 0) != 0)
559 throw BESInternalError(
"Could not initialize mutex in CurlHandlePool", __FILE__, __LINE__);
573 static struct curl_slist *
574 append_http_header(curl_slist *slist,
const string &header,
const string &value)
576 string full_header = header;
577 full_header.append(
" ").append(value);
579 struct curl_slist *temp = curl_slist_append(slist, full_header.c_str());
586 url_has_credentials(
const string &url)
588 return (url.find(
"cloudyopendap") != string::npos);
592 url_must_be_signed(
const string &url)
595 if(url.find(
"http://") == 0 || url.find(
"https://") == 0){
607 struct aws_credentials {
613 map<string,map<string,string>> credentials;
616 aws_credentials(): public_key(
""), secret_key(
""), region(
""), bucket_name(
"") {}
618 aws_credentials(
const string &p_key,
const string &s_key,
const string &r,
const string &b)
619 : public_key(p_key), secret_key(s_key), region(r), bucket_name(b) {}
621 aws_credentials(
const aws_credentials &rhs)
622 : public_key(rhs.public_key), secret_key(rhs.secret_key), region(rhs.region), bucket_name(rhs.bucket_name) {}
624 unique_ptr<aws_credentials> get(
const string &url);
627 void get_from_env(
const string &key,
string &value){
628 const char *cstr = getenv(key.c_str());
631 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" From system environment - " << key <<
": " << value << endl);
638 void get_from_config(
const string &key,
string &value){
639 bool key_found=
false;
642 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << key <<
" from TheBESKeys" << endl);
649 void get_creds_from_local(
string &aws_akid,
string &aws_sak,
string &aws_region,
string &aws_s3_bucket ){
651 const string KEYS_CONFIG_PREFIX(
"DMRPP");
653 const string ENV_AKID_KEY(
"AWS_ACCESS_KEY_ID");
654 const string CONFIG_AKID_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_AKID_KEY);
656 const string ENV_SAK_KEY(
"AWS_SECRET_ACCESS_KEY");
657 const string CONFIG_SAK_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_SAK_KEY);
659 const string ENV_REGION_KEY(
"AWS_REGION");
660 const string CONFIG_REGION_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_REGION_KEY);
662 const string ENV_S3_BUCKET_KEY(
"AWS_S3_BUCKET");
663 const string CONFIG_S3_BUCKET_KEY(KEYS_CONFIG_PREFIX+
"."+ENV_S3_BUCKET_KEY);
670 get_from_env(ENV_AKID_KEY,aws_akid);
671 get_from_env(ENV_SAK_KEY,aws_sak);
672 get_from_env(ENV_REGION_KEY,aws_region);
673 get_from_env(ENV_S3_BUCKET_KEY,aws_s3_bucket);
675 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__
676 <<
" From ENV aws_akid: '" << aws_akid <<
"' "
677 <<
"aws_sak: '" << aws_sak <<
"' "
678 <<
"aws_region: '" << aws_region <<
"' "
679 <<
"aws_s3_bucket: '" << aws_s3_bucket <<
"' "
688 if(aws_akid.length()){
689 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_AKID_KEY <<
" from the environment." << endl);
692 get_from_config(CONFIG_AKID_KEY,aws_akid);
695 if(aws_sak.length()){
696 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_SAK_KEY <<
" from the environment." << endl);
699 get_from_config(CONFIG_SAK_KEY,aws_sak);
702 if(aws_region.length()){
703 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_REGION_KEY <<
" from the environment." << endl);
706 get_from_config(CONFIG_REGION_KEY,aws_region);
709 if(aws_s3_bucket.length()){
710 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__ <<
" Using " << ENV_S3_BUCKET_KEY <<
" from the environment." << endl);
713 get_from_config(CONFIG_S3_BUCKET_KEY,aws_s3_bucket);
716 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__
717 <<
" END aws_akid: '" << aws_akid <<
"' "
718 <<
"aws_sak: '" << aws_sak <<
"' "
719 <<
"aws_region: '" << aws_region <<
"' "
720 <<
"aws_s3_bucket: '" << aws_s3_bucket <<
"' "
724 unique_ptr<aws_credentials>
725 aws_credentials::get(
const string &url)
732 string aws_s3_bucket;
734 if (url.find(
"cloudyopendap") != string::npos) {
736 get_creds_from_local(aws_akid, aws_sak, aws_region, aws_s3_bucket);
737 BESDEBUG(MODULE, __FILE__ <<
" " << __LINE__
738 <<
" aws_akid: " << aws_akid
739 <<
" aws_sak: " << aws_sak
740 <<
" aws_region: " << aws_region
741 <<
" aws_s3_bucket: " << aws_s3_bucket
744 unique_ptr<aws_credentials> creds(
new aws_credentials(aws_akid, aws_sak, aws_region, aws_s3_bucket));
747 unique_ptr<aws_credentials> creds(
new aws_credentials(
"",
"",
"",
""));
772 if(!WhiteList::get_white_list()->is_white_listed(chunk->
get_data_url())){
773 string msg =
"ERROR!! The chunk url " + chunk->
get_data_url() +
" does not match any white-list rule. ";
777 Lock lock(d_get_easy_handle_mutex);
780 for (vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
781 if (!(*i)->d_in_use) {
789 handle->d_in_use =
true;
792 handle->d_chunk = chunk;
794 CURLcode res = curl_easy_setopt(handle->d_handle, CURLOPT_URL, chunk->
get_data_url().c_str());
795 if (res != CURLE_OK)
throw BESInternalError(
string(
"HTTP Error setting URL: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
799 throw BESInternalError(
string(
"HTTP Error setting Range: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
803 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_WRITEDATA,
reinterpret_cast<void*
>(chunk))))
804 throw BESInternalError(
string(
"CURL Error setting chunk as data buffer: ").append(curl_error_msg(res, handle->d_errbuf)),
808 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_PRIVATE,
reinterpret_cast<void*
>(handle))))
809 throw BESInternalError(
string(
"CURL Error setting easy_handle as private data: ").append(curl_error_msg(res, handle->d_errbuf)), __FILE__,
813 if ( credentials && credentials->
isS3Cred()) {
814 BESDEBUG(MODULE,
"Got AccessCredentials instance: "<< endl << credentials->to_json() << endl );
817 const std::time_t request_time = std::time(0);
819 const std::string auth_header =
820 AWSV4::compute_awsv4_signature(
823 credentials->
get(AccessCredentials::ID_KEY),
824 credentials->
get(AccessCredentials::KEY_KEY),
825 credentials->
get(AccessCredentials::REGION_KEY),
832 handle->d_headers = append_http_header(0,
"Authorization:", auth_header);
833 if (!handle->d_headers)
835 string(
"CURL Error setting Authorization header: ").append(
836 curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
839 curl_slist *temp = append_http_header(handle->d_headers,
"x-amz-content-sha256:",
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
842 string(
"CURL Error setting x-amz-content-sha256: ").append(curl_error_msg(res, handle->d_errbuf)),
844 handle->d_headers = temp;
846 temp = append_http_header(handle->d_headers,
"x-amz-date:", AWSV4::ISO8601_date(request_time));
849 string(
"CURL Error setting x-amz-date header: ").append(curl_error_msg(res, handle->d_errbuf)),
851 handle->d_headers = temp;
854 if (CURLE_OK != (res = curl_easy_setopt(handle->d_handle, CURLOPT_HTTPHEADER, handle->d_headers)))
855 throw BESInternalError(
string(
"CURL Error setting HTTP headers for S3 authentication: ").append(
856 curl_error_msg(res, handle->d_errbuf)), __FILE__, __LINE__);
878 Lock lock(d_get_easy_handle_mutex);
883 handle->d_in_use =
false;
887 for (std::vector<dmrpp_easy_handle *>::iterator i = d_easy_handles.begin(), e = d_easy_handles.end(); i != e; ++i) {
889 BESDEBUG(
"dmrpp:5",
"Found a handle match for the " << i - d_easy_handles.begin() <<
"th easy handle." << endl);