From 5d4bc9946b331aa8fa8d37bcce5d87bb42bede82 Mon Sep 17 00:00:00 2001 From: morningman Date: Sun, 26 May 2024 10:21:55 +0800 Subject: [PATCH 1/3] [opt](s3) auto retry when meeting 429 error --- be/src/io/file_factory.cpp | 2 +- be/src/io/fs/s3_file_reader.cpp | 94 ++++++++++++++++++++++++++------- be/src/io/fs/s3_file_reader.h | 25 +++++++-- be/src/io/fs/s3_file_system.cpp | 2 +- 4 files changed, 98 insertions(+), 25 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index e61ed144486c0f..0c84c2eb74c7d8 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -156,7 +156,7 @@ Result FileFactory::create_file_reader( auto client_holder = std::make_shared(s3_conf.client_conf); RETURN_IF_ERROR_RESULT(client_holder->init()); return io::S3FileReader::create(std::move(client_holder), s3_conf.bucket, s3_uri.get_key(), - file_description.file_size) + file_description.file_size, profile) .and_then([&](auto&& reader) { return io::create_cached_file_reader(std::move(reader), reader_options); }); diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index e7775803198081..7d2864b93f3706 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -37,6 +37,7 @@ #include "io/fs/s3_common.h" #include "util/bvar_helper.h" #include "util/doris_metrics.h" +#include "util/runtime_profile.h" #include "util/s3_util.h" namespace doris::io { @@ -45,13 +46,14 @@ bvar::Adder s3_file_reader_read_counter("s3_file_reader", "read_at"); bvar::Adder s3_file_reader_total("s3_file_reader", "total_num"); bvar::Adder s3_bytes_read_total("s3_file_reader", "bytes_read"); bvar::Adder s3_file_being_read("s3_file_reader", "file_being_read"); +bvar::Adder s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request"); bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS bvar::PerSecond> s3_read_througthput("s3_file_reader", "s3_read_throughput", &s3_bytes_read_total); Result S3FileReader::create(std::shared_ptr client, std::string bucket, std::string key, - int64_t file_size) { + int64_t file_size, RuntimeProfile* profile) { if (file_size < 0) { auto res = client->object_file_size(bucket, key); if (!res.has_value()) { @@ -62,16 +64,17 @@ Result S3FileReader::create(std::shared_ptr(std::move(client), std::move(bucket), std::move(key), - file_size); + file_size, profile); } S3FileReader::S3FileReader(std::shared_ptr client, std::string bucket, - std::string key, size_t file_size) + std::string key, size_t file_size, RuntimeProfile* profile) : _path(fmt::format("s3://{}/{}", bucket, key)), _file_size(file_size), _bucket(std::move(bucket)), _key(std::move(key)), - _client(std::move(client)) { + _client(std::move(client)), + _profile(profile) { DorisMetrics::instance()->s3_file_open_reading->increment(1); DorisMetrics::instance()->s3_file_reader_total->increment(1); s3_file_reader_total << 1; @@ -113,23 +116,76 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea if (!client) { return Status::InternalError("init s3 client error"); } - // clang-format off - auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, - to, offset, bytes_req, bytes_read); - // clang-format on - if (resp.status.code != ErrorCode::OK) { - return std::move(Status(resp.status.code, std::move(resp.status.msg)) - .append(fmt::format("failed to read from {}", _path.native()))); + // // clang-format off + // auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, + // to, offset, bytes_req, bytes_read); + // // clang-format on + // if (resp.status.code != ErrorCode::OK) { + // return std::move(Status(resp.status.code, std::move(resp.status.msg)) + // .append(fmt::format("failed to read from {}", _path.native()))); + // } + // if (*bytes_read != bytes_req) { + // return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", + // _path.native(), *bytes_read, bytes_req); + SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); + + int retry_count = 0; + const int base_wait_time = 1000; // Base wait time in milliseconds + const int max_wait_time = 8000; // Maximum wait time in milliseconds + const int max_retries = 4; // wait 1s, 2s, 4s, 8s for each backoff + + while (retry_count <= max_retries) { + auto outcome = client->GetObject(request); + _s3_stats.total_get_request_counter++; + if (!outcome.IsSuccess()) { + auto error = outcome.GetError(); + if (error.GetResponseCode() == Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS) { + s3_file_reader_too_many_request_counter << 1; + retry_count++; + int wait_time = std::min(base_wait_time * (1 << retry_count), + max_wait_time); // Exponential backoff + std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); + _s3_stats.too_many_request_err_counter++; + _s3_stats.too_many_request_sleep_time_ms += wait_time; + continue; + } else { + // Handle other errors + return s3fs_error(outcome.GetError(), "failed to read"); + } + } + *bytes_read = outcome.GetResult().GetContentLength(); + if (*bytes_read != bytes_req) { + return Status::InternalError("failed to read (bytes read: {}, bytes req: {})", + *bytes_read, bytes_req); + } + _s3_stats.total_bytes_read += bytes_req; + s3_bytes_read_total << bytes_req; + s3_bytes_per_read << bytes_req; + s3_file_reader_read_counter << 1; + DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req); + return Status::OK(); } - if (*bytes_read != bytes_req) { - return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", - _path.native(), *bytes_read, bytes_req); + return Status::InternalError("failed to read from s3, exceeded maximum retries"); +} + +void S3FileReader::_collect_profile_before_close() { + if (_profile != nullptr) { + const char* s3_profile_name = "S3Profile"; + ADD_TIMER(_profile, s3_profile_name); + RuntimeProfile::Counter* total_get_request_counter = + ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name); + RuntimeProfile::Counter* too_many_request_err_counter = + ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name); + RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER( + _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name); + RuntimeProfile::Counter* total_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name); + + COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter); + COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter); + COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms); + COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read); } - s3_bytes_read_total << *bytes_read; - s3_bytes_per_read << *bytes_read; - s3_file_reader_read_counter << 1; - DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read); - return Status::OK(); } } // namespace doris::io diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index d681161ebed90e..a10cbc2ea7e8df 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -28,16 +28,21 @@ #include "io/fs/s3_file_system.h" #include "util/slice.h" -namespace doris::io { +namespace doris { +class RuntimeProfile; + +namespace io { struct IOContext; class S3FileReader final : public FileReader { public: +<<<<<<< HEAD static Result create(std::shared_ptr client, - std::string bucket, std::string key, int64_t file_size); + std::string bucket, std::string key, int64_t file_size, + RuntimeProfile* profile); S3FileReader(std::shared_ptr client, std::string bucket, std::string key, - size_t file_size); + size_t file_size, RuntimeProfile* profile); ~S3FileReader() override; @@ -53,7 +58,15 @@ class S3FileReader final : public FileReader { Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override; + private: + struct S3Statistics { + int64_t total_get_request_counter = 0; + int64_t too_many_request_err_counter = 0; + int64_t too_many_request_sleep_time_ms = 0; + int64_t total_bytes_read = 0; + }; Path _path; size_t _file_size; @@ -62,6 +75,10 @@ class S3FileReader final : public FileReader { std::shared_ptr _client; std::atomic _closed = false; + + RuntimeProfile* _profile = nullptr; + S3Statistics _s3_stats; }; -} // namespace doris::io +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 0a6048592b7908..9b7777866259a8 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -196,7 +196,7 @@ Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader, const FileReaderOptions& opts) { auto key = DORIS_TRY(get_key(file)); - *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size)); + *reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size, nullptr)); return Status::OK(); } From 12a10edbb385122d2ca638edc81e261b32b305af Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 28 May 2024 23:06:48 +0800 Subject: [PATCH 2/3] 1 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 7 +++++++ be/src/io/fs/s3_file_reader.cpp | 18 ++++++++++++++---- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5eb0e8d26ba428..9b5861ce4942b3 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1220,6 +1220,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400"); DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); DEFINE_mInt32(max_s3_client_retry, "10"); +DEFINE_mInt32(s3_read_base_wait_time_ms, "100"); +DEFINE_mInt32(s3_read_max_wait_time_ms, "800"); DEFINE_mBool(enable_s3_rate_limiter, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 592e96d1dc06ac..f63e28de78498c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1304,6 +1304,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta); DECLARE_mBool(enable_s3_rate_limiter); // max s3 client retry times DECLARE_mInt32(max_s3_client_retry); +// When meet s3 429 error, the "get" request will +// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms +// get try again. +// The max sleep time is s3_read_max_wait_time_ms +// and the max retry time is max_s3_client_retry +DECLARE_mInt32(s3_read_base_wait_time_ms); +DECLARE_mInt32(s3_read_max_wait_time_ms); // write as inverted index tmp directory DECLARE_String(tmp_file_dir); diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 7d2864b93f3706..8ec0305447440e 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -50,6 +50,10 @@ bvar::Adder s3_file_reader_too_many_request_counter("s3_file_reader", bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS bvar::PerSecond> s3_read_througthput("s3_file_reader", "s3_read_throughput", &s3_bytes_read_total); +// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only +// record successfull request, and s3_get_request_qps will record all request. +bvar::PerSecond> s3_get_request_qps("s3_file_reader", "s3_get_request", + &s3_file_reader_read_counter); Result S3FileReader::create(std::shared_ptr client, std::string bucket, std::string key, @@ -130,11 +134,13 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); int retry_count = 0; - const int base_wait_time = 1000; // Base wait time in milliseconds - const int max_wait_time = 8000; // Maximum wait time in milliseconds - const int max_retries = 4; // wait 1s, 2s, 4s, 8s for each backoff + const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds + const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds + const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff + int total_sleep_time = 0; while (retry_count <= max_retries) { + s3_file_reader_read_counter << 1; auto outcome = client->GetObject(request); _s3_stats.total_get_request_counter++; if (!outcome.IsSuccess()) { @@ -147,6 +153,7 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); _s3_stats.too_many_request_err_counter++; _s3_stats.too_many_request_sleep_time_ms += wait_time; + total_sleep_time += wait_time; continue; } else { // Handle other errors @@ -161,8 +168,11 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea _s3_stats.total_bytes_read += bytes_req; s3_bytes_read_total << bytes_req; s3_bytes_per_read << bytes_req; - s3_file_reader_read_counter << 1; DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req); + if (retry_count > 0) { + LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", + _path.native(), retry_count, total_sleep_time); + } return Status::OK(); } return Status::InternalError("failed to read from s3, exceeded maximum retries"); From 881b681ddbb72dfc52409a40f85182c751f69285 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 24 Jun 2024 12:52:58 +0800 Subject: [PATCH 3/3] 1 --- be/src/io/fs/s3_file_reader.cpp | 19 +++++++++++-------- be/src/io/fs/s3_file_reader.h | 3 +-- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 8ec0305447440e..a5c6ec09162cf4 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -56,8 +56,8 @@ bvar::PerSecond> s3_get_request_qps("s3_file_reader", "s3_ &s3_file_reader_read_counter); Result S3FileReader::create(std::shared_ptr client, - std::string bucket, std::string key, - int64_t file_size, RuntimeProfile* profile) { + std::string bucket, std::string key, int64_t file_size, + RuntimeProfile* profile) { if (file_size < 0) { auto res = client->object_file_size(bucket, key); if (!res.has_value()) { @@ -141,11 +141,14 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea int total_sleep_time = 0; while (retry_count <= max_retries) { s3_file_reader_read_counter << 1; - auto outcome = client->GetObject(request); + // clang-format off + auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, + to, offset, bytes_req, bytes_read); + // clang-format on _s3_stats.total_get_request_counter++; - if (!outcome.IsSuccess()) { - auto error = outcome.GetError(); - if (error.GetResponseCode() == Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS) { + if (resp.status.code != ErrorCode::OK) { + if (resp.http_code == + static_cast(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) { s3_file_reader_too_many_request_counter << 1; retry_count++; int wait_time = std::min(base_wait_time * (1 << retry_count), @@ -157,10 +160,10 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea continue; } else { // Handle other errors - return s3fs_error(outcome.GetError(), "failed to read"); + return std::move(Status(resp.status.code, std::move(resp.status.msg)) + .append("failed to read")); } } - *bytes_read = outcome.GetResult().GetContentLength(); if (*bytes_read != bytes_req) { return Status::InternalError("failed to read (bytes read: {}, bytes req: {})", *bytes_read, bytes_req); diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index a10cbc2ea7e8df..36fe67b342ce1c 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -36,10 +36,9 @@ struct IOContext; class S3FileReader final : public FileReader { public: -<<<<<<< HEAD static Result create(std::shared_ptr client, std::string bucket, std::string key, int64_t file_size, - RuntimeProfile* profile); + RuntimeProfile* profile); S3FileReader(std::shared_ptr client, std::string bucket, std::string key, size_t file_size, RuntimeProfile* profile);