diff --git a/ci/do_ci.sh b/ci/do_ci.sh index 41aaa6b54f..589d826afa 100755 --- a/ci/do_ci.sh +++ b/ci/do_ci.sh @@ -169,6 +169,9 @@ elif [[ "$1" == "code.coverage" ]]; then make make test lcov --directory $PWD --capture --output-file coverage.info + # removing test http server coverage from the total coverage. We don't use this server completely. + lcov --remove coverage.info '*/ext/http/server/*'> tmp_coverage.info 2>/dev/null + cp tmp_coverage.info coverage.info exit 0 fi diff --git a/ext/CMakeLists.txt b/ext/CMakeLists.txt index 75205ac71e..fa8f1e9498 100644 --- a/ext/CMakeLists.txt +++ b/ext/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(src) +include_directories(include) if(BUILD_TESTING) add_subdirectory(test) diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h new file mode 100644 index 0000000000..0605edc9fc --- /dev/null +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -0,0 +1,232 @@ +#pragma once + +#include "http_operation_curl.h" + +#include +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace curl +{ + +namespace http_client = opentelemetry::ext::http::client; +const http_client::StatusCode Http_Ok = 200; + +class Request : public http_client::Request +{ +public: + Request() : method_(http_client::Method::Get), uri_("/") {} + + void SetMethod(http_client::Method method) noexcept override { method_ = method; } + + void SetBody(http_client::Body &body) noexcept override { body_ = std::move(body); } + + void AddHeader(nostd::string_view name, nostd::string_view value) noexcept override + { + headers_.insert(std::pair(static_cast(name), + static_cast(value))); + } + + void ReplaceHeader(nostd::string_view name, nostd::string_view value) noexcept override + { + // erase matching headers + auto range = headers_.equal_range(static_cast(name)); + headers_.erase(range.first, range.second); + AddHeader(name, value); + } + + virtual void SetUri(nostd::string_view uri) noexcept override + { + uri_ = static_cast(uri); + } + + void SetTimeoutMs(std::chrono::milliseconds timeout_ms) noexcept override + { + timeout_ms_ = timeout_ms; + } + +public: + http_client::Method method_; + http_client::Body body_; + Headers headers_; + std::string uri_; + std::chrono::milliseconds timeout_ms_{5000}; // ms +}; + +class Response : public http_client::Response +{ +public: + Response() : status_code_(Http_Ok) {} + + virtual const http_client::Body &GetBody() const noexcept override { return body_; } + + virtual bool ForEachHeader( + nostd::function_ref callable) const + noexcept override + { + for (const auto &header : headers_) + { + if (!callable(header.first, header.second)) + { + return false; + } + } + return true; + } + + virtual bool ForEachHeader( + const nostd::string_view &name, + nostd::function_ref callable) const + noexcept override + { + auto range = headers_.equal_range(static_cast(name)); + for (auto it = range.first; it != range.second; ++it) + { + if (!callable(it->first, it->second)) + { + return false; + } + } + return true; + } + + virtual http_client::StatusCode GetStatusCode() const noexcept override { return status_code_; } + +public: + Headers headers_; + http_client::Body body_; + http_client::StatusCode status_code_; +}; + +class SessionManager; + +class Session : public http_client::Session +{ +public: + Session(SessionManager &session_manager, std::string host, uint16_t port = 80) + : session_manager_(session_manager), is_session_active_(false) + { + if (host.rfind("http://", 0) != 0 && host.rfind("https://", 0) != 0) + { + host_ = "http://" + host; // TODO - https support + } + host_ += ":" + std::to_string(port) + "/"; + } + + std::shared_ptr CreateRequest() noexcept override + { + http_request_.reset(new Request()); + return http_request_; + } + + virtual void SendRequest(http_client::EventHandler &callback) noexcept override + { + is_session_active_ = true; + std::string url = host_ + std::string(http_request_->uri_); + auto callback_ptr = &callback; + curl_operation_.reset(new HttpOperation(http_request_->method_, url, callback_ptr, + http_request_->headers_, http_request_->body_, false, + http_request_->timeout_ms_)); + curl_operation_->SendAsync([this, callback_ptr](HttpOperation &operation) { + if (operation.WasAborted()) + { + // Manually cancelled + callback_ptr->OnEvent(http_client::SessionState::Cancelled, ""); + } + + if (operation.GetResponseCode() >= CURL_LAST) + { + // we have a http response + auto response = std::unique_ptr(new Response()); + response->headers_ = operation.GetResponseHeaders(); + response->body_ = operation.GetResponseBody(); + callback_ptr->OnResponse(*response); + } + is_session_active_ = false; + }); + } + + virtual bool CancelSession() noexcept override + { + curl_operation_->Abort(); + return true; + } + + virtual bool FinishSession() noexcept override + { + curl_operation_->Finish(); + return true; + } + + virtual bool IsSessionActive() noexcept override { return is_session_active_; } + + void SetId(uint64_t session_id) { session_id_ = session_id; } + +private: + std::shared_ptr http_request_; + std::string host_; + std::unique_ptr curl_operation_; + uint64_t session_id_; + SessionManager &session_manager_; + bool is_session_active_; +}; + +class SessionManager : public http_client::SessionManager +{ +public: + // The call (curl_global_init) is not thread safe. Ensure this is called only once. + SessionManager() { curl_global_init(CURL_GLOBAL_ALL); } + + std::shared_ptr CreateSession(nostd::string_view host, + uint16_t port = 80) noexcept override + { + auto session = std::make_shared(*this, std::string(host), port); + auto session_id = ++next_session_id_; + session->SetId(session_id); + sessions_.insert({session_id, session}); + return session; + } + + bool CancelAllSessions() noexcept override + { + for (auto &session : sessions_) + { + session.second->CancelSession(); + } + return true; + } + + bool FinishAllSessions() noexcept override + { + for (auto &session : sessions_) + { + session.second->FinishSession(); + } + return true; + } + + void CleanupSession(uint64_t session_id) + { + // TBD = Need to be thread safe + sessions_.erase(session_id); + } + + ~SessionManager() { curl_global_cleanup(); } + +private: + std::atomic next_session_id_; + std::map> sessions_; +}; + +} // namespace curl +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h new file mode 100644 index 0000000000..e1383c88fd --- /dev/null +++ b/ext/include/opentelemetry/ext/http/client/curl/http_operation_curl.h @@ -0,0 +1,502 @@ +#pragma once + +#include "opentelemetry/ext/http/client/http_client.h" + +#include +#include +#include +#include +#include +#include +#include +#ifdef _WIN32 +# include +#else +# include +#endif + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace http +{ +namespace client +{ +namespace curl +{ +namespace http_client = opentelemetry::ext::http::client; +const std::chrono::milliseconds default_http_conn_timeout(5000); // ms +const std::string http_status_regexp = "HTTP\\/\\d\\.\\d (\\d+)\\ .*"; +const std::string http_header_regexp = "(.*)\\: (.*)\\n*"; + +struct curl_ci +{ + bool operator()(const std::string &s1, const std::string &s2) const + { + return std::lexicographical_compare( + s1.begin(), s1.end(), s2.begin(), s2.end(), + [](char c1, char c2) { return ::tolower(c1) < ::tolower(c2); }); + } +}; +using Headers = std::multimap; + +class HttpOperation +{ +public: + void DispatchEvent(http_client::SessionState type, std::string reason = "") + { + if (callback_ != nullptr) + { + callback_->OnEvent(type, reason); + } + } + + std::atomic is_aborted_; // Set to 'true' when async callback is aborted + std::atomic is_finished_; // Set to 'true' when async callback is finished. + + /** + * Create local CURL instance for url and body + * + * @param url + * @param callback + * @param request Request Headers + * @param body Reques Body + * @param raw_response whether to parse the response + * @param httpConnTimeout HTTP connection timeout in seconds + */ + HttpOperation(http_client::Method method, + std::string url, + http_client::EventHandler *callback, + // Default empty headers and empty request body + const Headers &request_headers = Headers(), + const http_client::Body &request_body = http_client::Body(), + // Default connectivity and response size options + bool is_raw_response = false, + std::chrono::milliseconds http_conn_timeout = default_http_conn_timeout) + : // + method_(method), + url_(url), + callback_(callback), + + // Local vars + request_headers_(request_headers), + request_body_(request_body), + // Optional connection params + is_raw_response_(is_raw_response), + http_conn_timeout_(http_conn_timeout), + // Result + res_(CURLE_OK), + sockfd_(0), + is_aborted_(false), + is_finished_(false), + nread_(0) + { + /* get a curl handle */ + curl_ = curl_easy_init(); + if (!curl_) + { + res_ = CURLE_FAILED_INIT; + DispatchEvent(http_client::SessionState::CreateFailed); + return; + } + + curl_easy_setopt(curl_, CURLOPT_VERBOSE, 0); + + // Specify target URL + curl_easy_setopt(curl_, CURLOPT_URL, url_.c_str()); + + // TODO: support ssl cert verification for https request + curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYPEER, 0); // 1L + curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYHOST, 0); // 2L + + // Specify our custom headers + for (auto &kv : this->request_headers_) + { + std::string header = std::string(kv.first); + header += ": "; + header += std::string(kv.second); + headers_chunk_ = curl_slist_append(headers_chunk_, header.c_str()); + } + + if (headers_chunk_ != nullptr) + { + curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers_chunk_); + } + + DispatchEvent(http_client::SessionState::Created); + } + + /** + * Destroy CURL instance + */ + virtual ~HttpOperation() + { + // Given the request has not been aborted we should wait for completion here + // This guarantees the lifetime of this request. + if (result_.valid()) + { + result_.wait(); + } + // TBD - Need to be uncomment. This will callback instance is deleted. + // DispatchEvent(http_client::SessionState::Destroy); + res_ = CURLE_OK; + curl_easy_cleanup(curl_); + curl_slist_free_all(headers_chunk_); + ReleaseResponse(); + } + + /** + * Finish CURL instance + */ + virtual void Finish() + { + if (result_.valid() && !is_finished_) + { + result_.wait(); + is_finished_ = true; + } + } + + /** + * Send request synchronously + */ + long Send() + { + ReleaseResponse(); + // Request buffer + const void *request = (request_body_.empty()) ? NULL : &request_body_[0]; + const size_t req_size = request_body_.size(); + if (!curl_) + { + res_ = CURLE_FAILED_INIT; + DispatchEvent(http_client::SessionState::SendFailed); + return res_; + } + + // TODO: control local port to use + // curl_easy_setopt(curl, CURLOPT_LOCALPORT, dcf_port); + + // Perform initial connect, handling the timeout if needed + + curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 1L); + DispatchEvent(http_client::SessionState::Connecting); + res_ = curl_easy_perform(curl_); + + if (CURLE_OK != res_) + { + DispatchEvent(http_client::SessionState::ConnectFailed, + curl_easy_strerror(res_)); // couldn't connect - stage 1 + return res_; + } + + /* Extract the socket from the curl handle - we'll need it for waiting. + * Note that this API takes a pointer to a 'long' while we use + * curl_socket_t for sockets otherwise. + */ + long sockextr = 0; + res_ = curl_easy_getinfo(curl_, CURLINFO_LASTSOCKET, &sockextr); + + if (CURLE_OK != res_) + { + DispatchEvent(http_client::SessionState::ConnectFailed, + curl_easy_strerror(res_)); // couldn't connect - stage 2 + return res_; + } + + /* wait for the socket to become ready for sending */ + sockfd_ = sockextr; + if (!WaitOnSocket(sockfd_, 0, http_conn_timeout_.count()) || is_aborted_) + { + res_ = CURLE_OPERATION_TIMEDOUT; + DispatchEvent( + http_client::SessionState::ConnectFailed, + " Is aborted: " + std::to_string(is_aborted_.load())); // couldn't connect - stage 3 + return res_; + } + + DispatchEvent(http_client::SessionState::Connected); + // once connection is there - switch back to easy perform for HTTP post + curl_easy_setopt(curl_, CURLOPT_CONNECT_ONLY, 0); + + // send all data to our callback function + if (is_raw_response_) + { + curl_easy_setopt(curl_, CURLOPT_HEADER, true); + curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, (void *)&WriteMemoryCallback); + curl_easy_setopt(curl_, CURLOPT_WRITEDATA, (void *)&raw_response_); + } + else + { + curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, (void *)&WriteVectorCallback); + curl_easy_setopt(curl_, CURLOPT_HEADERDATA, (void *)&resp_headers_); + curl_easy_setopt(curl_, CURLOPT_WRITEDATA, (void *)&resp_body_); + } + + // TODO: only two methods supported for now - POST and GET + if (method_ == http_client::Method::Post) + { + // POST + curl_easy_setopt(curl_, CURLOPT_POST, true); + curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, (const char *)request); + curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, req_size); + } + else if (method_ == http_client::Method::Get) + { + // GET + } + else + { + res_ = CURLE_UNSUPPORTED_PROTOCOL; + return res_; + } + + // abort if slower than 4kb/sec during 30 seconds + curl_easy_setopt(curl_, CURLOPT_LOW_SPEED_TIME, 30L); + curl_easy_setopt(curl_, CURLOPT_LOW_SPEED_LIMIT, 4096); + DispatchEvent(http_client::SessionState::Sending); + + res_ = curl_easy_perform(curl_); + if (CURLE_OK != res_) + { + DispatchEvent(http_client::SessionState::SendFailed, curl_easy_strerror(res_)); + return res_; + } + + /* Code snippet to parse raw HTTP response. This might come in handy + * if we ever consider to handle the raw upload instead of curl_easy_perform + ... + std::string resp((const char *)response); + std::regex http_status_regex(HTTP_STATUS_REGEXP); + std::smatch match; + if(std::regex_search(resp, match, http_status_regex)) + http_code = std::stol(match[1]); + ... + */ + + /* libcurl is nice enough to parse the http response code itself: */ + curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &res_); + // We got some response from server. Dump the contents. + DispatchEvent(http_client::SessionState::Response); + + // This function returns: + // - on success: HTTP status code. + // - on failure: CURL error code. + // The two sets of enums (CURLE, HTTP codes) - do not intersect, so we collapse them in one set. + return res_; + } + + std::future &SendAsync(std::function callback = nullptr) + { + result_ = std::async(std::launch::async, [this, callback] { + long result = Send(); + if (callback != nullptr) + { + callback(*this); + } + return result; + }); + return result_; + } + + /** + * Get HTTP response code. This function returns CURL error code if HTTP response code is invalid. + */ + long GetResponseCode() { return res_; } + + /** + * Get whether or not response was programmatically aborted + */ + bool WasAborted() { return is_aborted_.load(); } + + /** + * Return a copy of resposne headers + * + * @return + */ + Headers GetResponseHeaders() + { + Headers result; + if (resp_headers_.size() == 0) + return result; + + std::stringstream ss; + std::string headers((const char *)&resp_headers_[0], resp_headers_.size()); + ss.str(headers); + + std::string header; + while (std::getline(ss, header, '\n')) + { + // TODO - Regex below crashes with out-of-memory on CI docker container, so + // switching to string comparison. Need to debug and revert back. + + /*std::smatch match; + std::regex http_headers_regex(http_header_regexp); + if (std::regex_search(header, match, http_headers_regex)) + result.insert(std::pair( + static_cast(match[1]), static_cast(match[2]))); + */ + size_t pos = header.find(": "); + if (pos != std::string::npos) + result.insert( + std::pair(header.substr(0, pos), header.substr(pos + 2))); + } + return result; + } + + /** + * Return a copy of response body + * + * @return + */ + std::vector GetResponseBody() { return resp_body_; } + + /** + * Return a raw copy of response headers+body + * + * @return + */ + std::vector GetRawResponse() { return raw_response_; } + + /** + * Release memory allocated for response + */ + void ReleaseResponse() + { + resp_headers_.clear(); + resp_body_.clear(); + raw_response_.clear(); + } + + /** + * Abort request in connecting or reading state. + */ + void Abort() + { + is_aborted_ = true; + if (curl_ != nullptr) + { + // Simply close the socket - connection reset by peer + if (sockfd_) + { + ::close(sockfd_); + sockfd_ = 0; + } + } + } + + CURL *GetHandle() { return curl_; } + +protected: + const bool is_raw_response_; // Do not split response headers from response body + const std::chrono::milliseconds http_conn_timeout_; // Timeout for connect. Default: 5000ms + + CURL *curl_; // Local curl instance + CURLcode res_; // Curl result OR HTTP status code if successful + + http_client::EventHandler *callback_ = nullptr; + + // Request values + http_client::Method method_; + std::string url_; + const Headers &request_headers_; + const http_client::Body &request_body_; + struct curl_slist *headers_chunk_ = nullptr; + + // Processed response headers and body + std::vector resp_headers_; + std::vector resp_body_; + std::vector raw_response_; + + // Socket parameters + curl_socket_t sockfd_; + + // long sockextr_ = 0; + + curl_off_t nread_; + size_t sendlen_ = 0; // # bytes sent by client + size_t acklen_ = 0; // # bytes ack by server + + std::future result_; + + /** + * Helper routine to wait for data on socket + * + * @param sockfd + * @param for_recv + * @param timeout_ms + * @return + */ + static int WaitOnSocket(curl_socket_t sockfd, int for_recv, long timeout_ms) + { + struct timeval tv; + fd_set infd, outfd, errfd; + int res; + + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + + FD_ZERO(&infd); + FD_ZERO(&outfd); + FD_ZERO(&errfd); + + FD_SET(sockfd, &errfd); /* always check for error */ + + if (for_recv) + { + FD_SET(sockfd, &infd); + } + else + { + FD_SET(sockfd, &outfd); + } + + /* select() returns the number of signalled sockets or -1 */ + res = select((int)sockfd + 1, &infd, &outfd, &errfd, &tv); + return res; + } + + /** + * Old-school memory allocator + * + * @param contents + * @param size + * @param nmemb + * @param userp + * @return + */ + static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) + { + std::vector *buf = static_cast *>(userp); + buf->insert(buf->end(), static_cast(contents), + static_cast(contents) + (size * nmemb)); + return size * nmemb; + } + + /** + * C++ STL std::vector allocator + * + * @param ptr + * @param size + * @param nmemb + * @param data + * @return + */ + static size_t WriteVectorCallback(void *ptr, + size_t size, + size_t nmemb, + std::vector *data) + { + if (data != nullptr) + { + const unsigned char *begin = (unsigned char *)(ptr); + const unsigned char *end = begin + size * nmemb; + data->insert(data->end(), begin, end); + } + return size * nmemb; + } +}; +} // namespace curl +} // namespace client +} // namespace http +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/common/http_client.h b/ext/include/opentelemetry/ext/http/client/http_client.h similarity index 76% rename from sdk/include/opentelemetry/sdk/common/http_client.h rename to ext/include/opentelemetry/ext/http/client/http_client.h index 73e54f0fe2..a6f1c3e924 100644 --- a/sdk/include/opentelemetry/sdk/common/http_client.h +++ b/ext/include/opentelemetry/ext/http/client/http_client.h @@ -40,12 +40,12 @@ */ OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk -{ -namespace common +namespace ext { namespace http { +namespace client +{ enum class Method { @@ -60,18 +60,21 @@ enum class Method enum class SessionState { - Created, // session object is created - Ongoing, // session is ongoing - Finished, // session is finished ( this needs to be the final state ) - Queued, // http request is queued - TimedOut, // Request timedout, no response received - Aborted, // http request aborted due to local error, - Cancelled, // http request cancelled, possibly due to session->CancelSession(); - SendingFailed, // http request sending failed + CreateFailed, // session create failed + Created, // session created + Destroyed, // session destroyed + Connecting, // connecting to peer + ConnectFailed, // connection failed + Connected, // connected + Sending, // sending request + SendFailed, // request send failed + Response, // response received + SSLHandshakeFailed, // SSL handshake failed + TimedOut, // request time out NetworkError, // network error - SSLHandshakeFailed, // ssl handshake failed - ReadError, // error while reading response - WriteError // error while writing rquest + ReadError, // error reading response + WriteError, // error writing request + Cancelled // (manually) cancelled }; using Byte = uint8_t; @@ -103,12 +106,12 @@ class Response virtual const Body &GetBody() const noexcept = 0; virtual bool ForEachHeader( - nostd::function_ref callable) const + nostd::function_ref callable) const noexcept = 0; virtual bool ForEachHeader( const nostd::string_view &key, - nostd::function_ref callable) const + nostd::function_ref callable) const noexcept = 0; virtual StatusCode GetStatusCode() const noexcept = 0; @@ -121,7 +124,7 @@ class EventHandler public: virtual void OnResponse(Response &) noexcept = 0; - virtual void OnError(SessionState, nostd::string_view) noexcept = 0; + virtual void OnEvent(SessionState, nostd::string_view) noexcept = 0; virtual void OnConnecting(const SSLCertificate &) noexcept {} @@ -157,7 +160,7 @@ class SessionManager virtual ~SessionManager() = default; }; +} // namespace client } // namespace http -} // namespace common -} // namespace sdk +} // namespace ext OPENTELEMETRY_END_NAMESPACE diff --git a/ext/test/CMakeLists.txt b/ext/test/CMakeLists.txt index 189a03f69c..d12e56796b 100644 --- a/ext/test/CMakeLists.txt +++ b/ext/test/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(zpages) +add_subdirectory(http) diff --git a/ext/test/http/CMakeLists.txt b/ext/test/http/CMakeLists.txt new file mode 100644 index 0000000000..ab558d2ca7 --- /dev/null +++ b/ext/test/http/CMakeLists.txt @@ -0,0 +1,11 @@ +find_package(CURL) +if(CURL_FOUND) + set(CURL_LIBRARY "-lcurl") + set(FILENAME curl_http_test) + add_executable(${FILENAME} ${FILENAME}.cc) + include_directories(${CURL_INCLUDE_DIR}) + target_link_libraries(${FILENAME} ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} ${CURL_LIBRARIES}) + gtest_add_tests(TARGET ${FILENAME} TEST_PREFIX ext.http.curl. TEST_LIST + ${FILENAME}) +endif() diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc new file mode 100644 index 0000000000..1c7245ed47 --- /dev/null +++ b/ext/test/http/curl_http_test.cc @@ -0,0 +1,260 @@ +#include "opentelemetry/ext//http/client/curl//http_client_curl.h" +#include "opentelemetry/ext/http/server/http_server.h" + +#include +#include +#include +#include +#include +#include +#include + +#define HTTP_PORT 19000 + +#include + +namespace curl = opentelemetry::ext::http::client::curl; +namespace http_client = opentelemetry::ext::http::client; + +class CustomEventHandler : public http_client::EventHandler +{ +public: + virtual void OnResponse(http_client::Response &response) noexcept override{}; + virtual void OnEvent(http_client::SessionState state, + opentelemetry::nostd::string_view reason) noexcept override + {} + virtual void OnConnecting(const http_client::SSLCertificate &) noexcept {} + virtual ~CustomEventHandler() = default; + bool is_called_ = false; +}; + +class GetEventHandler : public CustomEventHandler +{ + void OnResponse(http_client::Response &response) noexcept override + { + ASSERT_EQ(200, response.GetStatusCode()); + ASSERT_EQ(response.GetBody().size(), 0); + is_called_ = true; + }; +}; + +class PostEventHandler : public CustomEventHandler +{ + void OnResponse(http_client::Response &response) noexcept override + { + ASSERT_EQ(200, response.GetStatusCode()); + std::string body(response.GetBody().begin(), response.GetBody().end()); + ASSERT_EQ(body, "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"); + is_called_ = true; + } +}; + +class BasicCurlHttpTests : public ::testing::Test, public HTTP_SERVER_NS::HttpRequestCallback +{ +protected: + HTTP_SERVER_NS::HttpServer server_; + std::string server_address_; + std::atomic is_setup_; + std::atomic is_running_; + std::vector received_requests_; + std::mutex mtx_requests; + std::condition_variable cv_got_events; + std::mutex cv_m; + +public: + BasicCurlHttpTests() : is_setup_(false), is_running_(false){}; + + virtual void SetUp() override + { + if (is_setup_.exchange(true)) + { + return; + } + int port = server_.addListeningPort(HTTP_PORT); + std::ostringstream os; + os << "localhost:" << port; + server_address_ = "http://" + os.str() + "/simple/"; + server_.setServerName(os.str()); + server_.setKeepalive(false); + server_.addHandler("/simple/", *this); + server_.addHandler("/get/", *this); + server_.addHandler("/post/", *this); + server_.start(); + is_running_ = true; + } + + virtual void TearDown() override + { + if (!is_setup_.exchange(false)) + return; + server_.stop(); + is_running_ = false; + } + + virtual int onHttpRequest(HTTP_SERVER_NS::HttpRequest const &request, + HTTP_SERVER_NS::HttpResponse &response) override + { + if (request.uri == "/get/") + { + std::unique_lock lk(mtx_requests); + received_requests_.push_back(request); + response.headers["Content-Type"] = "text/plain"; + return 200; + } + if (request.uri == "/post/") + { + std::unique_lock lk(mtx_requests); + received_requests_.push_back(request); + response.headers["Content-Type"] = "application/json"; + response.body = "{'k1':'v1', 'k2':'v2', 'k3':'v3'}"; + return 200; + } + return 404; + } + + bool waitForRequests(unsigned timeOutSec, unsigned expected_count = 1) + { + std::unique_lock lk(cv_m); + if (cv_got_events.wait_for(lk, std::chrono::milliseconds(1000 * timeOutSec), + [&] { return received_requests_.size() >= expected_count; })) + { + return true; + } + return false; + } +}; + +TEST_F(BasicCurlHttpTests, DoNothing) {} + +TEST_F(BasicCurlHttpTests, HttpRequest) +{ + curl::Request req; + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + http_client::Body body1 = body; + req.SetBody(body); + ASSERT_EQ(req.body_, body1); + req.AddHeader("name1", "value1"); + req.AddHeader("name2", "value2"); + ASSERT_TRUE(req.headers_.find("name1")->second == "value1"); + ASSERT_TRUE(req.headers_.find("name2")->second == "value2"); + + req.ReplaceHeader("name1", "value3"); + ASSERT_EQ(req.headers_.find("name1")->second, "value3"); + + req.SetTimeoutMs(std::chrono::duration(5000)); + ASSERT_EQ(req.timeout_ms_, std::chrono::duration(5000)); +} + +TEST_F(BasicCurlHttpTests, HttpResponse) +{ + curl::Response res; + std::multimap m1 = { + {"name1", "value1_1"}, {"name1", "value1_2"}, {"name2", "value3"}, {"name3", "value3"}}; + res.headers_ = m1; + + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + int count = 0; + res.ForEachHeader("name1", [&count](opentelemetry::nostd::string_view name, + opentelemetry::nostd::string_view value) { + if (name != "name1") + return false; + if (value != "value1_1" && value != "value1_2") + return false; + count++; + return true; + }); + ASSERT_EQ(count, 2); + count = 0; + res.ForEachHeader( + [&count](opentelemetry::nostd::string_view name, opentelemetry::nostd::string_view value) { + if (name != "name1" && name != "name2" && name != "name3") + return false; + if (value != "value1_1" && value != "value1_2" && value != "value2" && value != "value3") + return false; + count++; + return true; + }); + ASSERT_EQ(count, 4); +} + +TEST_F(BasicCurlHttpTests, SendGetRequest) +{ + received_requests_.clear(); + curl::SessionManager session_manager; + + auto session = session_manager.CreateSession("127.0.0.1", HTTP_PORT); + auto request = session->CreateRequest(); + request->SetUri("get/"); + GetEventHandler *handler = new GetEventHandler(); + session->SendRequest(*handler); + ASSERT_TRUE(waitForRequests(1, 1)); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + delete handler; +} + +TEST_F(BasicCurlHttpTests, SendPostRequest) +{ + received_requests_.clear(); + curl::SessionManager session_manager; + + auto session = session_manager.CreateSession("127.0.0.1", HTTP_PORT); + auto request = session->CreateRequest(); + request->SetUri("post/"); + request->SetMethod(http_client::Method::Post); + + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + request->SetBody(body); + request->AddHeader("Content-Type", "text/plain"); + PostEventHandler *handler = new PostEventHandler(); + session->SendRequest(*handler); + ASSERT_TRUE(waitForRequests(1, 1)); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + + session_manager.CancelAllSessions(); + session_manager.FinishAllSessions(); + + delete handler; +} + +TEST_F(BasicCurlHttpTests, RequestTimeout) +{ + received_requests_.clear(); + curl::SessionManager session_manager; + + auto session = session_manager.CreateSession("127.0.0.10", HTTP_PORT); // Non Existing address + auto request = session->CreateRequest(); + request->SetUri("get/"); + GetEventHandler *handler = new GetEventHandler(); + session->SendRequest(*handler); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + delete handler; +} + +TEST_F(BasicCurlHttpTests, CurlHttpOperations) +{ + GetEventHandler *handler = new GetEventHandler(); + + const char *b = "test-data"; + http_client::Body body = {b, b + strlen(b)}; + + std::multimap m1 = { + {"name1", "value1_1"}, {"name1", "value1_2"}, {"name2", "value3"}, {"name3", "value3"}}; + curl::Headers headers = m1; + curl::HttpOperation http_operations1(http_client::Method::Head, "/get", handler, headers, body, + true); + http_operations1.Send(); + + curl::HttpOperation http_operations2(http_client::Method::Get, "/get", handler, headers, body, + true); + http_operations2.Send(); + + curl::HttpOperation http_operations3(http_client::Method::Get, "/get", handler, headers, body, + false); + http_operations3.Send(); +}