diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 37ec4bc6da7dc0..41449fa82ece43 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1262,7 +1262,7 @@ void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest s3_conf.use_virtual_addressing = !resource.s3_storage_param.use_path_style; std::shared_ptr fs; if (existed_resource.fs == nullptr) { - st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), &fs); + st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), nullptr, &fs); } else { fs = std::static_pointer_cast(existed_resource.fs); st = fs->set_conf(s3_conf); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index be7a78b198fca1..88f7f9b12dd011 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1190,6 +1190,9 @@ DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); DEFINE_mInt32(max_s3_client_retry, "10"); +DEFINE_mInt32(s3_read_base_wait_time_ms, "100"); +DEFINE_mInt32(s3_read_max_wait_time_ms, "800"); + // ca_cert_file is in this path by default, Normally no modification is required // ca cert default path is different from different OS DEFINE_mString(ca_cert_file_paths, diff --git a/be/src/common/config.h b/be/src/common/config.h index da399de9ab273c..9d33e4d7964fee 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1267,6 +1267,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta); // max s3 client retry times DECLARE_mInt32(max_s3_client_retry); +// When meet s3 429 error, the "get" request will +// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms +// get try again. +// The max sleep time is s3_read_max_wait_time_ms +// and the max retry time is max_s3_client_retry +DECLARE_mInt32(s3_read_base_wait_time_ms); +DECLARE_mInt32(s3_read_max_wait_time_ms); // write as inverted index tmp directory DECLARE_String(tmp_file_dir); diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 777a3505701d8a..13d31ea5110598 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -87,7 +87,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, RETURN_IF_ERROR( S3ClientFactory::convert_properties_to_s3_conf(properties, s3_uri, &s3_conf)); std::shared_ptr fs; - RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &fs)); + RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts)); break; } @@ -134,7 +134,7 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr } case TFileType::FILE_S3: { RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description, - reader_options, file_system, file_reader)); + reader_options, file_system, file_reader, profile)); break; } case TFileType::FILE_HDFS: { @@ -213,13 +213,13 @@ Status FileFactory::create_s3_reader(const std::map& p const io::FileDescription& fd, const io::FileReaderOptions& reader_options, std::shared_ptr* s3_file_system, - io::FileReaderSPtr* reader) { + io::FileReaderSPtr* reader, RuntimeProfile* profile) { S3URI s3_uri(fd.path); RETURN_IF_ERROR(s3_uri.parse()); S3Conf s3_conf; RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf)); std::shared_ptr fs; - RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", &fs)); + RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", profile, &fs)); RETURN_IF_ERROR(fs->open_file(fd.path, reader, &reader_options)); *s3_file_system = std::move(fs); return Status::OK(); diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 48e4e2e9ed0a03..5fc29374eba2b5 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -98,7 +98,7 @@ class FileFactory { const io::FileDescription& fd, const io::FileReaderOptions& reader_options, std::shared_ptr* s3_file_system, - io::FileReaderSPtr* reader); + io::FileReaderSPtr* reader, RuntimeProfile* profile); static Status create_broker_reader(const TNetworkAddress& broker_addr, const std::map& prop, diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index c78eabd09d78b8..48955f2eb63581 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -35,6 +35,7 @@ #include "io/fs/s3_common.h" #include "util/bvar_helper.h" #include "util/doris_metrics.h" +#include "util/runtime_profile.h" #include "util/s3_util.h" namespace doris { @@ -44,13 +45,23 @@ bvar::Adder s3_file_reader_read_counter("s3_file_reader", "read_at"); bvar::Adder s3_file_reader_total("s3_file_reader", "total_num"); bvar::Adder s3_bytes_read_total("s3_file_reader", "bytes_read"); bvar::Adder s3_file_being_read("s3_file_reader", "file_being_read"); +bvar::Adder s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request"); +bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS +bvar::PerSecond> s3_read_througthput("s3_file_reader", "s3_read_throughput", + &s3_bytes_read_total); +// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only +// record successfull request, and s3_get_request_qps will record all request. +bvar::PerSecond> s3_get_request_qps("s3_file_reader", "s3_get_request", + &s3_file_reader_read_counter); -S3FileReader::S3FileReader(size_t file_size, std::string key, std::shared_ptr fs) +S3FileReader::S3FileReader(size_t file_size, std::string key, std::shared_ptr fs, + RuntimeProfile* profile) : _path(fmt::format("s3://{}/{}", fs->s3_conf().bucket, key)), _file_size(file_size), _bucket(fs->s3_conf().bucket), _key(std::move(key)), - _fs(std::move(fs)) { + _fs(std::move(fs)), + _profile(profile) { DorisMetrics::instance()->s3_file_open_reading->increment(1); DorisMetrics::instance()->s3_file_reader_total->increment(1); s3_file_reader_total << 1; @@ -98,20 +109,70 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea return Status::InternalError("init s3 client error"); } SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency); - auto outcome = client->GetObject(request); - if (!outcome.IsSuccess()) { - return s3fs_error(outcome.GetError(), - fmt::format("failed to read from {}", _path.native())); + + int retry_count = 0; + const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds + const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds + const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff + + int total_sleep_time = 0; + while (retry_count <= max_retries) { + s3_file_reader_read_counter << 1; + auto outcome = client->GetObject(request); + _s3_stats.total_get_request_counter++; + if (!outcome.IsSuccess()) { + auto error = outcome.GetError(); + if (error.GetResponseCode() == Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS) { + s3_file_reader_too_many_request_counter << 1; + retry_count++; + int wait_time = std::min(base_wait_time * (1 << retry_count), + max_wait_time); // Exponential backoff + std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); + _s3_stats.too_many_request_err_counter++; + _s3_stats.too_many_request_sleep_time_ms += wait_time; + total_sleep_time += wait_time; + continue; + } else { + // Handle other errors + return s3fs_error(outcome.GetError(), "failed to read"); + } + } + *bytes_read = outcome.GetResult().GetContentLength(); + if (*bytes_read != bytes_req) { + return Status::InternalError("failed to read (bytes read: {}, bytes req: {})", + *bytes_read, bytes_req); + } + _s3_stats.total_bytes_read += bytes_req; + s3_bytes_read_total << bytes_req; + s3_bytes_per_read << bytes_req; + DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req); + if (retry_count > 0) { + LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", + _path.native(), retry_count, total_sleep_time); + } + return Status::OK(); } - *bytes_read = outcome.GetResult().GetContentLength(); - if (*bytes_read != bytes_req) { - return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})", - _path.native(), *bytes_read, bytes_req); + return Status::InternalError("failed to read from s3, exceeded maximum retries"); +} + +void S3FileReader::_collect_profile_before_close() { + if (_profile != nullptr) { + const char* s3_profile_name = "S3Profile"; + ADD_TIMER(_profile, s3_profile_name); + RuntimeProfile::Counter* total_get_request_counter = + ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name); + RuntimeProfile::Counter* too_many_request_err_counter = + ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name); + RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER( + _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name); + RuntimeProfile::Counter* total_bytes_read = + ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name); + + COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter); + COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter); + COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms); + COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read); } - s3_bytes_read_total << *bytes_read; - s3_file_reader_read_counter << 1; - DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read); - return Status::OK(); } } // namespace io diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index 3db265fb534839..c9e9656fe5860b 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -31,12 +31,16 @@ #include "util/slice.h" namespace doris { + +class RuntimeProfile; + namespace io { struct IOContext; class S3FileReader final : public FileReader { public: - S3FileReader(size_t file_size, std::string key, std::shared_ptr fs); + S3FileReader(size_t file_size, std::string key, std::shared_ptr fs, + RuntimeProfile* profile); ~S3FileReader() override; @@ -54,7 +58,15 @@ class S3FileReader final : public FileReader { Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override; + private: + struct S3Statistics { + int64_t total_get_request_counter = 0; + int64_t too_many_request_err_counter = 0; + int64_t too_many_request_sleep_time_ms = 0; + int64_t total_bytes_read = 0; + }; Path _path; size_t _file_size; @@ -63,6 +75,8 @@ class S3FileReader final : public FileReader { std::shared_ptr _fs; std::atomic _closed = false; + RuntimeProfile* _profile = nullptr; + S3Statistics _s3_stats; }; } // namespace io diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 947020c89c5ad9..fde6c8a482f8f0 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -72,6 +72,7 @@ #include "io/fs/s3_file_reader.h" #include "io/fs/s3_file_writer.h" #include "util/bvar_helper.h" +#include "util/runtime_profile.h" #include "util/s3_uri.h" #include "util/s3_util.h" @@ -124,14 +125,16 @@ std::string S3FileSystem::full_path(std::string_view key) const { return fmt::format("{}/{}/{}", _s3_conf.endpoint, _s3_conf.bucket, key); } -Status S3FileSystem::create(S3Conf s3_conf, std::string id, std::shared_ptr* fs) { - (*fs).reset(new S3FileSystem(std::move(s3_conf), std::move(id))); +Status S3FileSystem::create(S3Conf s3_conf, std::string id, RuntimeProfile* profile, + std::shared_ptr* fs) { + (*fs).reset(new S3FileSystem(std::move(s3_conf), std::move(id), profile)); return (*fs)->connect(); } -S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id) +S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id, RuntimeProfile* profile) : RemoteFileSystem(s3_conf.prefix, std::move(id), FileSystemType::S3), - _s3_conf(std::move(s3_conf)) { + _s3_conf(std::move(s3_conf)), + _profile(profile) { // FIXME(plat1ko): Normalize prefix // remove the first and last '/' if (!_s3_conf.prefix.empty()) { @@ -174,7 +177,8 @@ Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader } GET_KEY(key, file); *reader = std::make_shared( - fsize, std::move(key), std::static_pointer_cast(shared_from_this())); + fsize, std::move(key), std::static_pointer_cast(shared_from_this()), + _profile); return Status::OK(); } diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 6bd1ae4ff17b29..26bf8186a93e35 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -40,6 +40,8 @@ class PooledThreadExecutor; } // namespace Aws::Utils::Threading namespace doris { +class RuntimeProfile; + namespace io { struct FileInfo; @@ -56,7 +58,8 @@ struct FileInfo; // This class is thread-safe.(Except `set_xxx` method) class S3FileSystem final : public RemoteFileSystem { public: - static Status create(S3Conf s3_conf, std::string id, std::shared_ptr* fs); + static Status create(S3Conf s3_conf, std::string id, RuntimeProfile* profile, + std::shared_ptr* fs); ~S3FileSystem() override; // Guarded by external lock. Status set_conf(S3Conf s3_conf); @@ -101,7 +104,7 @@ class S3FileSystem final : public RemoteFileSystem { } private: - S3FileSystem(S3Conf&& s3_conf, std::string&& id); + S3FileSystem(S3Conf&& s3_conf, std::string&& id, RuntimeProfile* profile); // Full path for error message, format: endpoint/bucket/key std::string full_path(std::string_view key) const; @@ -114,6 +117,7 @@ class S3FileSystem final : public RemoteFileSystem { mutable std::mutex _client_mu; std::shared_ptr _client; std::shared_ptr _executor; + RuntimeProfile* _profile = nullptr; }; } // namespace io diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 0c79da3050c64e..f064bd798f726a 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -104,7 +104,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l RETURN_IF_ERROR(s3_uri.parse()); RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf)); std::shared_ptr fs; - RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", &fs)); + RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", nullptr, &fs)); _remote_fs = std::move(fs); } else if (TStorageBackendType::type::HDFS == type) { THdfsParams hdfs_params = parse_properties(_prop); diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index f30b71dc8be8c5..64fa5313f30d5c 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -373,7 +373,7 @@ Status VFileResultWriter::_delete_dir() { std::shared_ptr s3_fs = nullptr; RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf( _file_opts->broker_properties, s3_uri, &s3_conf)); - RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &s3_fs)); + RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", nullptr, &s3_fs)); file_system = s3_fs; break; }