Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,8 @@ DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mInt32(max_s3_client_retry, "10");
DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
DEFINE_mInt32(s3_read_max_wait_time_ms, "800");

DEFINE_mBool(enable_s3_rate_limiter, "false");

Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);
DECLARE_mBool(enable_s3_rate_limiter);
// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
// When meet s3 429 error, the "get" request will
// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms
// get try again.
// The max sleep time is s3_read_max_wait_time_ms
// and the max retry time is max_s3_client_retry
DECLARE_mInt32(s3_read_base_wait_time_ms);
DECLARE_mInt32(s3_read_max_wait_time_ms);

// write as inverted index tmp directory
DECLARE_String(tmp_file_dir);
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
auto client_holder = std::make_shared<io::ObjClientHolder>(s3_conf.client_conf);
RETURN_IF_ERROR_RESULT(client_holder->init());
return io::S3FileReader::create(std::move(client_holder), s3_conf.bucket, s3_uri.get_key(),
file_description.file_size)
file_description.file_size, profile)
.and_then([&](auto&& reader) {
return io::create_cached_file_reader(std::move(reader), reader_options);
});
Expand Down
109 changes: 89 additions & 20 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "io/fs/s3_common.h"
#include "util/bvar_helper.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"

namespace doris::io {
Expand All @@ -45,13 +46,18 @@ bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read");
bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request");
bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS
bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput",
&s3_bytes_read_total);
// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only
// record successfull request, and s3_get_request_qps will record all request.
bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3_bvar::s3_get_latency can get qps/P99 latency altogether. It seems no need to add this bvar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3_get_latency can only save "success" request.
s3_get_request_qps saves both "success" and "failed request"

&s3_file_reader_read_counter);

Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolder> client,
std::string bucket, std::string key,
int64_t file_size) {
std::string bucket, std::string key, int64_t file_size,
RuntimeProfile* profile) {
if (file_size < 0) {
auto res = client->object_file_size(bucket, key);
if (!res.has_value()) {
Expand All @@ -62,16 +68,17 @@ Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolde
}

return std::make_shared<S3FileReader>(std::move(client), std::move(bucket), std::move(key),
file_size);
file_size, profile);
}

S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket,
std::string key, size_t file_size)
std::string key, size_t file_size, RuntimeProfile* profile)
: _path(fmt::format("s3://{}/{}", bucket, key)),
_file_size(file_size),
_bucket(std::move(bucket)),
_key(std::move(key)),
_client(std::move(client)) {
_client(std::move(client)),
_profile(profile) {
DorisMetrics::instance()->s3_file_open_reading->increment(1);
DorisMetrics::instance()->s3_file_reader_total->increment(1);
s3_file_reader_total << 1;
Expand Down Expand Up @@ -113,23 +120,85 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
if (!client) {
return Status::InternalError("init s3 client error");
}
// clang-format off
auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
to, offset, bytes_req, bytes_read);
// clang-format on
if (resp.status.code != ErrorCode::OK) {
return std::move(Status(resp.status.code, std::move(resp.status.msg))
.append(fmt::format("failed to read from {}", _path.native())));
// // clang-format off
// auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
// to, offset, bytes_req, bytes_read);
// // clang-format on
// if (resp.status.code != ErrorCode::OK) {
// return std::move(Status(resp.status.code, std::move(resp.status.msg))
// .append(fmt::format("failed to read from {}", _path.native())));
// }
// if (*bytes_read != bytes_req) {
// return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})",
// _path.native(), *bytes_read, bytes_req);
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);

int retry_count = 0;
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff

int total_sleep_time = 0;
while (retry_count <= max_retries) {
s3_file_reader_read_counter << 1;
// clang-format off
auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
}

to, offset, bytes_req, bytes_read);
// clang-format on
_s3_stats.total_get_request_counter++;
if (resp.status.code != ErrorCode::OK) {
if (resp.http_code ==
static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) {
s3_file_reader_too_many_request_counter << 1;
retry_count++;
int wait_time = std::min(base_wait_time * (1 << retry_count),
max_wait_time); // Exponential backoff
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
_s3_stats.too_many_request_err_counter++;
_s3_stats.too_many_request_sleep_time_ms += wait_time;
total_sleep_time += wait_time;
continue;
} else {
// Handle other errors
return std::move(Status(resp.status.code, std::move(resp.status.msg))
.append("failed to read"));
}
}
if (*bytes_read != bytes_req) {
return Status::InternalError("failed to read (bytes read: {}, bytes req: {})",
*bytes_read, bytes_req);
}
_s3_stats.total_bytes_read += bytes_req;
s3_bytes_read_total << bytes_req;
s3_bytes_per_read << bytes_req;
DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req);
if (retry_count > 0) {
LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping",
_path.native(), retry_count, total_sleep_time);
}
return Status::OK();
}
if (*bytes_read != bytes_req) {
return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})",
_path.native(), *bytes_read, bytes_req);
return Status::InternalError("failed to read from s3, exceeded maximum retries");
}

void S3FileReader::_collect_profile_before_close() {
if (_profile != nullptr) {
const char* s3_profile_name = "S3Profile";
ADD_TIMER(_profile, s3_profile_name);
RuntimeProfile::Counter* total_get_request_counter =
ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name);
RuntimeProfile::Counter* too_many_request_err_counter =
ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name);
RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER(
_profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name);
RuntimeProfile::Counter* total_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name);

COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter);
COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter);
COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms);
COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read);
}
s3_bytes_read_total << *bytes_read;
s3_bytes_per_read << *bytes_read;
s3_file_reader_read_counter << 1;
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
return Status::OK();
}

} // namespace doris::io
24 changes: 20 additions & 4 deletions be/src/io/fs/s3_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@
#include "io/fs/s3_file_system.h"
#include "util/slice.h"

namespace doris::io {
namespace doris {
class RuntimeProfile;

namespace io {
struct IOContext;

class S3FileReader final : public FileReader {
public:
static Result<FileReaderSPtr> create(std::shared_ptr<const ObjClientHolder> client,
std::string bucket, std::string key, int64_t file_size);
std::string bucket, std::string key, int64_t file_size,
RuntimeProfile* profile);

S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket, std::string key,
size_t file_size);
size_t file_size, RuntimeProfile* profile);

~S3FileReader() override;

Expand All @@ -53,7 +57,15 @@ class S3FileReader final : public FileReader {
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;

void _collect_profile_before_close() override;

private:
struct S3Statistics {
int64_t total_get_request_counter = 0;
int64_t too_many_request_err_counter = 0;
int64_t too_many_request_sleep_time_ms = 0;
int64_t total_bytes_read = 0;
};
Path _path;
size_t _file_size;

Expand All @@ -62,6 +74,10 @@ class S3FileReader final : public FileReader {
std::shared_ptr<const ObjClientHolder> _client;

std::atomic<bool> _closed = false;

RuntimeProfile* _profile = nullptr;
S3Statistics _s3_stats;
};

} // namespace doris::io
} // namespace io
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
Status S3FileSystem::open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) {
auto key = DORIS_TRY(get_key(file));
*reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size));
*reader = DORIS_TRY(S3FileReader::create(_client, _bucket, key, opts.file_size, nullptr));
return Status::OK();
}

Expand Down