Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
if (!version_id.empty())
req.SetVersionId(version_id);

req.SetAdditionalCustomHeaderValue("clickhouse-request", fmt::format("attempt={}", attempt));
S3::setClickhouseAttemptNumber(req, attempt);

if (range_end_incl)
{
Expand Down
159 changes: 86 additions & 73 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <aws/core/utils/logging/ErrorMacros.h>

#include <Poco/Net/NetException.h>
#include <Poco/Exception.h>

#include <IO/Expect404ResponseScope.h>
#include <IO/S3/Requests.h>
Expand Down Expand Up @@ -501,37 +502,37 @@ Model::GetObjectTaggingOutcome Client::GetObjectTagging(GetObjectTaggingRequest
Model::ListObjectsV2Outcome Client::ListObjectsV2(ListObjectsV2Request & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ true>(
request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
request, [this](Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
}

Model::ListObjectsOutcome Client::ListObjects(ListObjectsRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ true>(
request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
request, [this](Model::ListObjectsRequest & req) { return ListObjects(req); });
}

Model::GetObjectOutcome Client::GetObject(GetObjectRequest & request) const
{
return processRequestResult(
doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); }));
doRequest(request, [this](Model::GetObjectRequest & req) { return GetObject(req); }));
}

Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(AbortMultipartUploadRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
request, [this](Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
}

Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(CreateMultipartUploadRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
request, [this](Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
}

Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(CompleteMultipartUploadRequest & request) const
{
auto outcome = doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
request, [this](Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });

const auto & key = request.GetKey();
const auto & bucket = request.GetBucket();
Expand Down Expand Up @@ -578,42 +579,42 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(CompleteMu
Model::CopyObjectOutcome Client::CopyObject(CopyObjectRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
request, [this](Model::CopyObjectRequest & req) { return CopyObject(req); });
}

Model::PutObjectOutcome Client::PutObject(PutObjectRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
request, [this](Model::PutObjectRequest & req) { return PutObject(req); });
}

Model::UploadPartOutcome Client::UploadPart(UploadPartRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
request, [this](Model::UploadPartRequest & req) { return UploadPart(req); });
}

Model::UploadPartCopyOutcome Client::UploadPartCopy(UploadPartCopyRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
request, [this](Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
}

Model::DeleteObjectOutcome Client::DeleteObject(DeleteObjectRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::DeleteObjectRequest & req) { Expect404ResponseScope scope; return DeleteObject(req); });
request, [this](Model::DeleteObjectRequest & req) { Expect404ResponseScope scope; return DeleteObject(req); });
}

Model::DeleteObjectsOutcome Client::DeleteObjects(DeleteObjectsRequest & request) const
{
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, [this](const Model::DeleteObjectsRequest & req) { Expect404ResponseScope scope; return DeleteObjects(req); });
request, [this](Model::DeleteObjectsRequest & req) { Expect404ResponseScope scope; return DeleteObjects(req); });
}

Client::ComposeObjectOutcome Client::ComposeObject(ComposeObjectRequest & request) const
{
auto request_fn = [this](const ComposeObjectRequest & req)
auto request_fn = [this](ComposeObjectRequest & req)
{
auto & endpoint_provider = const_cast<Client &>(*this).accessEndpointProvider();
AWS_OPERATION_CHECK_PTR(endpoint_provider, ComposeObject, Aws::Client::CoreErrors, Aws::Client::CoreErrors::ENDPOINT_RESOLUTION_FAILURE);
Expand Down Expand Up @@ -642,7 +643,7 @@ Client::ComposeObjectOutcome Client::ComposeObject(ComposeObjectRequest & reques
}

template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
std::invoke_result_t<RequestFn, RequestType &>
Client::doRequest(RequestType & request, RequestFn request_fn) const
{
addAdditionalAMZHeadersToCanonicalHeadersList(request, client_configuration.extra_headers);
Expand Down Expand Up @@ -741,88 +742,100 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
}

template <bool IsReadMethod, typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
std::invoke_result_t<RequestFn, RequestType &>
Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request_fn) const
{
/// S3 does retries network errors actually.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made more changes in that function, just refactoring in a way that I like more.

/// But it does matter when errors occur.
/// This code retries a specific case when
/// network error happens when XML document is being read from the response body.
/// Hence, the response body is a stream, network errors are possible at reading.
/// S3 doesn't retry them.

/// Not all requests can be retried in that way.
/// Requests that read out response body to build the result are possible to retry.
/// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject.

addAdditionalAMZHeadersToCanonicalHeadersList(request, client_configuration.extra_headers);
auto with_retries = [this, request_fn_ = std::move(request_fn)] (const RequestType & request_)
auto with_retries = [this, request_fn_ = std::move(request_fn)] (RequestType & request_)
{
chassert(client_configuration.retryStrategy);
const Int64 max_attempts = client_configuration.retry_strategy.max_retries + 1;
chassert(max_attempts > 0);
std::exception_ptr last_exception = nullptr;
bool inside_retry_loop = false;
for (Int64 attempt_no = 0; attempt_no < max_attempts; ++attempt_no)

Int64 attempt_no = 1;
std::invoke_result_t<RequestFn, RequestType &> outcome;

auto net_exception_handler = [&]() -> bool /// return true if we should retry
{
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestsErrors, ProfileEvents::S3WriteRequestsErrors);
if (isClientForDisk())
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestsErrors, ProfileEvents::DiskS3WriteRequestsErrors);

tryLogCurrentException(log, fmt::format("Network error on S3 request, attempt {} of {}", attempt_no, max_attempts));

outcome = Aws::Client::AWSError<Aws::Client::CoreErrors>(
Aws::Client::CoreErrors::NETWORK_CONNECTION,
/*name*/ "",
/*message*/ fmt::format("All {} retry attempts failed. Last exception: {}", max_attempts, getCurrentExceptionMessage(false)),
/*retryable*/ true);
Comment on lines +776 to +780
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outcome variable is being assigned an error outcome in the exception handler, but this assignment may be unnecessary if the lambda returns immediately after. Consider whether this assignment is used when returning from the exception handler, or if it's only used when continuing the retry loop.

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It acts as last error, if we run out of attempt we return the error stored in that variable, that is the last error.


// network exceptions are always retryable, we could just return true here
// but we have to check cancellation points for query, ShouldRetry method does it already
return client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1);
};

for (attempt_no = 1; attempt_no <= max_attempts; ++attempt_no)
{
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestAttempts, ProfileEvents::S3WriteRequestAttempts);
if (isClientForDisk())
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestAttempts, ProfileEvents::DiskS3WriteRequestAttempts);

if (attempt_no > 1)
{
incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestRetryableErrors, ProfileEvents::S3WriteRequestRetryableErrors);
if (isClientForDisk())
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestRetryableErrors, ProfileEvents::DiskS3WriteRequestRetryableErrors);

// use previously attempt number to calculate delay
updateNextTimeToRetryAfterRetryableError(outcome.GetError(), attempt_no - 1);

// update ClickHouse-specific attempt number in the request
// to help choose the right timeouts on the HTTP client which depends on retry attempt number
auto clickhouse_request_attempt = getClickhouseAttemptNumber(request_);
setClickhouseAttemptNumber(request_, clickhouse_request_attempt + attempt_no);
}

/// Slowing down due to a previously encountered retryable error, possibly from another thread.
slowDownAfterRetryableError();

try
{
/// S3 does retries network errors actually.
/// But it does matter when errors occur.
/// This code retries a specific case when
/// network error happens when XML document is being read from the response body.
/// Hence, the response body is a stream, network errors are possible at reading.
/// S3 doesn't retry them.

/// Not all requests can be retried in that way.
/// Requests that read out response body to build the result are possible to retry.
/// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject.
auto outcome = request_fn_(request_);

if (!outcome.IsSuccess()
/// AWS SDK's built-in per-thread retry logic is disabled.
&& client_configuration.s3_slow_all_threads_after_retryable_error
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If seems like when s3_slow_all_threads_after_retryable_error is `false, we do not do any retries at all here.

&& attempt_no + 1 < max_attempts
/// Retry attempts are managed by the outer loop, so the attemptedRetries argument can be ignored.
&& client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1))
{
incrementProfileEvents<IsReadMethod>(
ProfileEvents::S3ReadRequestRetryableErrors, ProfileEvents::S3WriteRequestRetryableErrors);
if (isClientForDisk())
incrementProfileEvents<IsReadMethod>(
ProfileEvents::DiskS3ReadRequestRetryableErrors, ProfileEvents::DiskS3WriteRequestRetryableErrors);

updateNextTimeToRetryAfterRetryableError(outcome.GetError(), attempt_no);
inside_retry_loop = true;
continue;
}

if (inside_retry_loop)
LOG_TRACE(log, "Request succeeded after {} retries. Max retries: {}", attempt_no, max_attempts);

return outcome;
}
catch (Poco::Net::NetException &)
{
/// This includes "connection reset", "malformed message", and possibly other exceptions.
outcome = request_fn_(request_);

incrementProfileEvents<IsReadMethod>(ProfileEvents::S3ReadRequestsErrors, ProfileEvents::S3WriteRequestsErrors);
if (isClientForDisk())
incrementProfileEvents<IsReadMethod>(ProfileEvents::DiskS3ReadRequestsErrors, ProfileEvents::DiskS3WriteRequestsErrors);

tryLogCurrentException(log, "Will retry");
last_exception = std::current_exception();
if (outcome.IsSuccess())
break;

auto error = Aws::Client::AWSError<Aws::Client::CoreErrors>(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true);
// do not increment S3ReadRequestsErrors/S3WriteRequestsErrors here, it has been accounted in IO/S3/PocoHTTPClient.cpp

/// Check if query is canceled.
/// Retry attempts are managed by the outer loop, so the attemptedRetries argument can be ignored.
if (!client_configuration.retryStrategy->ShouldRetry(error, /*attemptedRetries*/ -1))
if (!client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1))
break;
}
catch (Poco::Net::NetException &)
{
/// This includes "connection reset", "malformed message", and possibly other exceptions.
if (!net_exception_handler())
break;
}
catch (Poco::TimeoutException &)
{
/// This includes "Timeout"
if (!net_exception_handler())
break;

updateNextTimeToRetryAfterRetryableError(error, attempt_no);
inside_retry_loop = true;
}
}

chassert(last_exception);
std::rethrow_exception(last_exception);
return outcome;
};

return doRequest(request, with_retries);
Expand Down Expand Up @@ -863,7 +876,7 @@ void Client::updateNextTimeToRetryAfterRetryableError(Aws::Client::AWSError<Aws:
{
if (next_time_to_retry_after_retryable_error.compare_exchange_weak(stored_next_time, next_time_ms))
{
LOG_TRACE(log, "Updated next retry time to {} ms forward after retryable error with code {} ('{}')", sleep_ms, error.GetResponseCode(), error.GetMessage());
LOG_TRACE(log, "Updated next retry time to {} ms forward after retryable error with code {}", sleep_ms, error.GetResponseCode());
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/IO/S3/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,11 @@ class Client : private Aws::S3::S3Client
ComposeObjectOutcome ComposeObject(ComposeObjectRequest & request) const;

template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
std::invoke_result_t<RequestFn, RequestType &>
doRequest(RequestType & request, RequestFn request_fn) const;

template <bool IsReadMethod, typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
std::invoke_result_t<RequestFn, RequestType &>
doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request_fn) const;

void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
Expand Down
52 changes: 18 additions & 34 deletions src/IO/S3/PocoHTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
#if USE_AWS_S3

#include <IO/S3/PocoHTTPClient.h>
#include <IO/S3/Requests.h>

#include <utility>
#include <algorithm>
#include <functional>

Expand Down Expand Up @@ -389,30 +389,6 @@ void PocoHTTPClient::observeLatency(const Aws::Http::HttpRequest & request, S3La
}
}

String extractAttemptFromInfo(const Aws::String & request_info)
{
static auto key = Aws::String("attempt=");

auto key_begin = request_info.find(key, 0);
if (key_begin == Aws::String::npos)
return "1";

auto val_begin = key_begin + key.size();
auto val_end = request_info.find(';', val_begin);
if (val_end == Aws::String::npos)
val_end = request_info.size();

return request_info.substr(val_begin, val_end-val_begin);
}

String getOrEmpty(const Aws::Http::HeaderValueCollection & map, const String & key)
{
auto it = map.find(key);
if (it == map.end())
return {};
return it->second;
}

ConnectionTimeouts PocoHTTPClient::getTimeouts(const String & method, bool first_attempt, bool first_byte) const
{
if (!s3_use_adaptive_timeouts)
Expand Down Expand Up @@ -455,12 +431,12 @@ String getMethod(const Aws::Http::HttpRequest & request)
}
}

PocoHTTPClient::S3LatencyType PocoHTTPClient::getFirstByteLatencyType(const String & sdk_attempt, const String & ch_attempt)
PocoHTTPClient::S3LatencyType PocoHTTPClient::getFirstByteLatencyType(size_t sdk_attempt, size_t ch_attempt)
{
S3LatencyType result = S3LatencyType::FirstByteAttempt1;
if (sdk_attempt != "1" || ch_attempt != "1")
if (sdk_attempt != 1 || ch_attempt != 1)
{
if ((sdk_attempt == "1" && ch_attempt == "2") || (sdk_attempt == "2" && ch_attempt == "1"))
if ((sdk_attempt == 1 && ch_attempt == 2) || (sdk_attempt == 2 && ch_attempt == 1))
result = S3LatencyType::FirstByteAttempt2;
else
result = S3LatencyType::FirstByteAttemptN;
Expand All @@ -479,12 +455,20 @@ void PocoHTTPClient::makeRequestInternalImpl(
auto uri = request.GetUri().GetURIString();
auto method = getMethod(request);

auto sdk_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), Aws::Http::SDK_REQUEST_HEADER));
auto ch_attempt = extractAttemptFromInfo(getOrEmpty(request.GetHeaders(), "clickhouse-request"));
bool first_attempt = ch_attempt == "1" && sdk_attempt == "1";

if (enable_s3_requests_logging)
LOG_TEST(log, "Make request to: {}, aws sdk attempt: {}, clickhouse attempt: {}", uri, sdk_attempt, ch_attempt);
auto sdk_attempt = getSDKAttemptNumber(request);
auto ch_attempt = getClickhouseAttemptNumber(request);
bool first_attempt = ch_attempt == 1 && sdk_attempt == 1;

if (!first_attempt)
LOG_DEBUG(
log,
"Retrying S3 request to: {}, aws sdk attempt: {}, clickhouse attempt: {}, kind: {}",
uri, sdk_attempt, ch_attempt, getMetricKind(request) == S3MetricKind::Read ? "Read" : "Write");
else // if (enable_s3_requests_logging)
LOG_TEST(
log,
"Make S3 request to: {}, aws sdk attempt: {}, clickhouse attempt: {}, kind: {}",
uri, sdk_attempt, ch_attempt, getMetricKind(request) == S3MetricKind::Read ? "Read" : "Write");

switch (request.GetMethod())
{
Expand Down
2 changes: 1 addition & 1 deletion src/IO/S3/PocoHTTPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class PocoHTTPClient : public Aws::Http::HttpClient
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;

static S3LatencyType getFirstByteLatencyType(const String & sdk_attempt, const String & ch_attempt);
static S3LatencyType getFirstByteLatencyType(size_t sdk_attempt, size_t ch_attempt);

protected:
virtual void makeRequestInternal(
Expand Down
Loading
Loading