Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 40 additions & 36 deletions src/datadog/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class CurlImpl {
};

void run();
void handle_message(const CURLMsg &, std::unique_lock<std::mutex> &);
void handle_message(const CURLMsg &);
CURLcode log_on_error(CURLcode result);
CURLMcode log_on_error(CURLMcode result);

Expand Down Expand Up @@ -386,16 +386,13 @@ Expected<void> CurlImpl::post(
handle.get(), (url.scheme + "://" + url.authority + url.path).c_str()));
}

std::list<CURL *> node;
node.push_back(handle.get());
{
std::lock_guard<std::mutex> lock(mutex_);
new_handles_.splice(new_handles_.end(), node);

(void)headers.release();
(void)handle.release();
(void)request.release();
new_handles_.emplace_back(handle.get());
}
std::ignore = headers.release();
std::ignore = handle.release();
std::ignore = request.release();

log_on_error(curl_.multi_wakeup(multi_handle_));

Expand All @@ -405,6 +402,8 @@ Expected<void> CurlImpl::post(
}

void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) {
log_on_error(curl_.multi_wakeup(multi_handle_));

std::unique_lock<std::mutex> lock(mutex_);
no_requests_.wait_until(lock, deadline, [this]() {
return num_active_handles_ == 0 && new_handles_.empty();
Expand Down Expand Up @@ -465,31 +464,32 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) {
}

void CurlImpl::run() {
int num_messages_remaining;
CURLMsg *message;
constexpr int max_wait_milliseconds = 10000;
std::unique_lock<std::mutex> lock(mutex_);
int num_active_handles = 0;
int num_messages_remaining = 0;

for (;;) {
log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles_));
if (num_active_handles_ == 0) {
no_requests_.notify_all();
}
bool shutting_down = false;
CURLMsg *message = nullptr;
constexpr int max_wait_milliseconds = 10'000;

// If a request is done or errored out, curl will enqueue a "message" for
// us to handle. Handle any pending messages.
while ((message = curl_.multi_info_read(multi_handle_,
&num_messages_remaining))) {
handle_message(*message, lock);
}
lock.unlock();
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
max_wait_milliseconds, nullptr));
std::unique_lock<std::mutex> lock(mutex_, std::defer_lock);
std::list<CURL *> handles_to_process;

while (true) {
lock.lock();
shutting_down = shutting_down_;

handles_to_process.splice(handles_to_process.begin(), new_handles_);
assert(new_handles_.empty());

num_active_handles_ =
num_active_handles + static_cast<int>(handles_to_process.size());
lock.unlock();

no_requests_.notify_all();

// New requests might have been added while we were sleeping.
for (; !new_handles_.empty(); new_handles_.pop_front()) {
CURL *handle = new_handles_.front();
for (; !handles_to_process.empty(); handles_to_process.pop_front()) {
CURL *handle = handles_to_process.front();
char *user_data;
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) !=
CURLE_OK) {
Expand Down Expand Up @@ -528,9 +528,18 @@ void CurlImpl::run() {
request_handles_.insert(handle);
}

if (shutting_down_) {
break;
if (shutting_down) break;

log_on_error(curl_.multi_perform(multi_handle_, &num_active_handles));

// If a request is done or errored out, curl will enqueue a "message" for
// us to handle. Handle any pending messages.
while ((message = curl_.multi_info_read(multi_handle_,
&num_messages_remaining))) {
handle_message(*message);
}
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
max_wait_milliseconds, nullptr));
}

// We're shutting down. Clean up any remaining request handles.
Expand All @@ -548,8 +557,7 @@ void CurlImpl::run() {
request_handles_.clear();
}

void CurlImpl::handle_message(const CURLMsg &message,
std::unique_lock<std::mutex> &lock) {
void CurlImpl::handle_message(const CURLMsg &message) {
if (message.msg != CURLMSG_DONE) {
return;
}
Expand All @@ -571,21 +579,17 @@ void CurlImpl::handle_message(const CURLMsg &message,
error_message += curl_.easy_strerror(result);
error_message += "): ";
error_message += request.error_buffer;
lock.unlock();
request.on_error(
Error{Error::CURL_REQUEST_FAILURE, std::move(error_message)});
lock.lock();
} else {
long status;
if (log_on_error(curl_.easy_getinfo_response_code(request_handle,
&status)) != CURLE_OK) {
status = -1;
}
HeaderReader reader(&request.response_headers_lower);
lock.unlock();
request.on_response(static_cast<int>(status), reader,
std::move(request.response_body));
lock.lock();
}

log_on_error(curl_.multi_remove_handle(multi_handle_, request_handle));
Expand Down
10 changes: 9 additions & 1 deletion test/test_curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
WriteCallback on_write_ = nullptr;
CURL *added_handle_ = nullptr;
CURLMsg message_;
enum class state {
unknown,
added,
performed,
finished
} state_ = state::unknown;
// Since `SingleRequestMockCurlLibrary` supports at most one request,
// `created_handles_` and `destroyed_handles_` will have size zero or one.
std::unordered_multiset<CURL *> created_handles_;
Expand Down Expand Up @@ -76,6 +82,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {

CURLMcode multi_add_handle(CURLM *, CURL *easy_handle) override {
added_handle_ = easy_handle;
state_ = state::added;
return CURLM_OK;
}
CURLMsg *multi_info_read(CURLM *, int *msgs_in_queue) override {
Expand All @@ -85,7 +92,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
}

*msgs_in_queue = added_handle_ != nullptr;
if (*msgs_in_queue == 0) {
if (*msgs_in_queue == 0 || state_ != state::performed) {
return nullptr;
}
message_.msg = CURLMSG_DONE;
Expand Down Expand Up @@ -127,6 +134,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
REQUIRE(on_write_(body.data() + body.size() / 2, 1, remaining,
user_data_on_write_) == remaining);

state_ = state::performed;
return CURLM_OK;
}
CURLMcode multi_remove_handle(CURLM *, CURL *easy_handle) override {
Expand Down