From 643afb4cd6febb2415d1af3fd3aebc121eb345ea Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Sat, 19 Apr 2025 22:38:06 +0200 Subject: [PATCH] perf: improve mutex contention The telemetry refactoring exposed high mutex contention in our NGINX integration during shutdown/reload phase. This commit addresses mutex contention in the curl integration used by the default http client. --- src/datadog/curl.cpp | 76 +++++++++++++++++++++++--------------------- test/test_curl.cpp | 10 +++++- 2 files changed, 49 insertions(+), 37 deletions(-) diff --git a/src/datadog/curl.cpp b/src/datadog/curl.cpp index 33203dcb..2657f98b 100644 --- a/src/datadog/curl.cpp +++ b/src/datadog/curl.cpp @@ -214,7 +214,7 @@ class CurlImpl { }; void run(); - void handle_message(const CURLMsg &, std::unique_lock &); + void handle_message(const CURLMsg &); CURLcode log_on_error(CURLcode result); CURLMcode log_on_error(CURLMcode result); @@ -386,16 +386,13 @@ Expected CurlImpl::post( handle.get(), (url.scheme + "://" + url.authority + url.path).c_str())); } - std::list node; - node.push_back(handle.get()); { std::lock_guard lock(mutex_); - new_handles_.splice(new_handles_.end(), node); - - (void)headers.release(); - (void)handle.release(); - (void)request.release(); + new_handles_.emplace_back(handle.get()); } + std::ignore = headers.release(); + std::ignore = handle.release(); + std::ignore = request.release(); log_on_error(curl_.multi_wakeup(multi_handle_)); @@ -405,6 +402,8 @@ Expected CurlImpl::post( } void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) { + log_on_error(curl_.multi_wakeup(multi_handle_)); + std::unique_lock lock(mutex_); no_requests_.wait_until(lock, deadline, [this]() { return num_active_handles_ == 0 && new_handles_.empty(); @@ -465,31 +464,32 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) { } void CurlImpl::run() { - int num_messages_remaining; - CURLMsg *message; - constexpr int max_wait_milliseconds = 10000; - std::unique_lock lock(mutex_); + int num_active_handles = 0; + int num_messages_remaining = 0; - for (;;) { - log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles_)); - if (num_active_handles_ == 0) { - no_requests_.notify_all(); - } + bool shutting_down = false; + CURLMsg *message = nullptr; + constexpr int max_wait_milliseconds = 10'000; - // If a request is done or errored out, curl will enqueue a "message" for - // us to handle. Handle any pending messages. - while ((message = curl_.multi_info_read(multi_handle_, - &num_messages_remaining))) { - handle_message(*message, lock); - } - lock.unlock(); - log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0, - max_wait_milliseconds, nullptr)); + std::unique_lock lock(mutex_, std::defer_lock); + std::list handles_to_process; + + while (true) { lock.lock(); + shutting_down = shutting_down_; + + handles_to_process.splice(handles_to_process.begin(), new_handles_); + assert(new_handles_.empty()); + + num_active_handles_ = + num_active_handles + static_cast(handles_to_process.size()); + lock.unlock(); + + no_requests_.notify_all(); // New requests might have been added while we were sleeping. - for (; !new_handles_.empty(); new_handles_.pop_front()) { - CURL *handle = new_handles_.front(); + for (; !handles_to_process.empty(); handles_to_process.pop_front()) { + CURL *handle = handles_to_process.front(); char *user_data; if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) != CURLE_OK) { @@ -528,9 +528,18 @@ void CurlImpl::run() { request_handles_.insert(handle); } - if (shutting_down_) { - break; + if (shutting_down) break; + + log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles)); + + // If a request is done or errored out, curl will enqueue a "message" for + // us to handle. Handle any pending messages. + while ((message = curl_.multi_info_read(multi_handle_, + &num_messages_remaining))) { + handle_message(*message); } + log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0, + max_wait_milliseconds, nullptr)); } // We're shutting down. Clean up any remaining request handles. @@ -548,8 +557,7 @@ void CurlImpl::run() { request_handles_.clear(); } -void CurlImpl::handle_message(const CURLMsg &message, - std::unique_lock &lock) { +void CurlImpl::handle_message(const CURLMsg &message) { if (message.msg != CURLMSG_DONE) { return; } @@ -571,10 +579,8 @@ void CurlImpl::handle_message(const CURLMsg &message, error_message += curl_.easy_strerror(result); error_message += "): "; error_message += request.error_buffer; - lock.unlock(); request.on_error( Error{Error::CURL_REQUEST_FAILURE, std::move(error_message)}); - lock.lock(); } else { long status; if (log_on_error(curl_.easy_getinfo_response_code(request_handle, @@ -582,10 +588,8 @@ void CurlImpl::handle_message(const CURLMsg &message, status = -1; } HeaderReader reader(&request.response_headers_lower); - lock.unlock(); request.on_response(static_cast(status), reader, std::move(request.response_body)); - lock.lock(); } log_on_error(curl_.multi_remove_handle(multi_handle_, request_handle)); diff --git a/test/test_curl.cpp b/test/test_curl.cpp index 5649a285..d9c72bcc 100644 --- a/test/test_curl.cpp +++ b/test/test_curl.cpp @@ -26,6 +26,12 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { WriteCallback on_write_ = nullptr; CURL *added_handle_ = nullptr; CURLMsg message_; + enum class state { + unknown, + added, + performed, + finished + } state_ = state::unknown; // Since `SingleRequestMockCurlLibrary` supports at most one request, // `created_handles_` and `destroyed_handles_` will have size zero or one. std::unordered_multiset created_handles_; @@ -76,6 +82,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { CURLMcode multi_add_handle(CURLM *, CURL *easy_handle) override { added_handle_ = easy_handle; + state_ = state::added; return CURLM_OK; } CURLMsg *multi_info_read(CURLM *, int *msgs_in_queue) override { @@ -85,7 +92,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { } *msgs_in_queue = added_handle_ != nullptr; - if (*msgs_in_queue == 0) { + if (*msgs_in_queue == 0 || state_ != state::performed) { return nullptr; } message_.msg = CURLMSG_DONE; @@ -127,6 +134,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { REQUIRE(on_write_(body.data() + body.size() / 2, 1, remaining, user_data_on_write_) == remaining); + state_ = state::performed; return CURLM_OK; } CURLMcode multi_remove_handle(CURLM *, CURL *easy_handle) override {