Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,8 @@ void TaskWorkerPool::_push_storage_policy_worker_thread_callback() {
s3_conf.use_virtual_addressing = !resource.s3_storage_param.use_path_style;
std::shared_ptr<io::S3FileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), &fs);
st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), nullptr,
&fs);
} else {
fs = std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
st = fs->set_conf(s3_conf);
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,9 @@ DEFINE_mInt32(max_s3_client_retry, "10");

DEFINE_String(trino_connector_plugin_dir, "${DORIS_HOME}/connectors");

DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
DEFINE_mInt32(s3_read_max_wait_time_ms, "800");

// ca_cert_file is in this path by default, Normally no modification is required
// ca cert default path is different from different OS
DEFINE_mString(ca_cert_file_paths,
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,13 @@ DECLARE_mBool(check_segment_when_build_rowset_meta);

// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
// When meet s3 429 error, the "get" request will
// sleep s3_read_base_wait_time_ms (*1, *2, *3, *4) ms
// get try again.
// The max sleep time is s3_read_max_wait_time_ms
// and the max retry time is max_s3_client_retry
DECLARE_mInt32(s3_read_base_wait_time_ms);
DECLARE_mInt32(s3_read_max_wait_time_ms);

// the directory for storing the trino-connector plugins.
DECLARE_String(trino_connector_plugin_dir);
Expand Down
8 changes: 4 additions & 4 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
RETURN_IF_ERROR(
S3ClientFactory::convert_properties_to_s3_conf(properties, s3_uri, &s3_conf));
std::shared_ptr<io::S3FileSystem> fs;
RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &fs));
RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", nullptr, &fs));
RETURN_IF_ERROR(fs->create_file(path, &file_writer));
break;
}
Expand Down Expand Up @@ -122,7 +122,7 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr
}
case TFileType::FILE_S3: {
RETURN_IF_ERROR(create_s3_reader(system_properties.properties, file_description,
reader_options, file_system, file_reader));
reader_options, file_system, file_reader, profile));
break;
}
case TFileType::FILE_HDFS: {
Expand Down Expand Up @@ -191,13 +191,13 @@ Status FileFactory::create_s3_reader(const std::map<std::string, std::string>& p
const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* s3_file_system,
io::FileReaderSPtr* reader) {
io::FileReaderSPtr* reader, RuntimeProfile* profile) {
S3URI s3_uri(fd.path);
RETURN_IF_ERROR(s3_uri.parse());
S3Conf s3_conf;
RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(prop, s3_uri, &s3_conf));
std::shared_ptr<io::S3FileSystem> fs;
RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", &fs));
RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", profile, &fs));
RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader));
*s3_file_system = std::move(fs);
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class FileFactory {
const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,
std::shared_ptr<io::FileSystem>* s3_file_system,
io::FileReaderSPtr* reader);
io::FileReaderSPtr* reader, RuntimeProfile* profile);

static Status create_broker_reader(const TNetworkAddress& broker_addr,
const std::map<std::string, std::string>& prop,
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/benchmark/s3_benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class S3Benchmark : public BaseBenchmark {
S3Conf s3_conf;
RETURN_IF_ERROR(
S3ClientFactory::convert_properties_to_s3_conf(_conf_map, s3_uri, &s3_conf));
return io::S3FileSystem::create(std::move(s3_conf), "", fs);
return io::S3FileSystem::create(std::move(s3_conf), "", nullptr, fs);
}
};

Expand Down Expand Up @@ -70,7 +70,7 @@ class S3OpenReadBenchmark : public S3Benchmark {
fd.path = file_path;
RETURN_IF_ERROR(FileFactory::create_s3_reader(
_conf_map, fd, reader_opts, reinterpret_cast<std::shared_ptr<io::FileSystem>*>(&fs),
&reader));
&reader, nullptr));
return read(state, reader);
}
};
Expand Down
109 changes: 92 additions & 17 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "io/fs/s3_common.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"

namespace doris {
namespace io {
Expand All @@ -41,14 +43,23 @@ bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read");
bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request");
bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS
bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput",
&s3_bytes_read_total);
// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only
// record successfull request, and s3_get_request_qps will record all request.
bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request",
&s3_file_reader_read_counter);

S3FileReader::S3FileReader(Path path, size_t file_size, std::string key, std::string bucket,
std::shared_ptr<S3FileSystem> fs)
std::shared_ptr<S3FileSystem> fs, RuntimeProfile* profile)
: _path(std::move(path)),
_file_size(file_size),
_fs(std::move(fs)),
_bucket(std::move(bucket)),
_key(std::move(key)) {
_key(std::move(key)),
_profile(profile) {
DorisMetrics::instance()->s3_file_open_reading->increment(1);
DorisMetrics::instance()->s3_file_reader_total->increment(1);
s3_file_reader_total << 1;
Expand Down Expand Up @@ -94,23 +105,87 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
if (!client) {
return Status::InternalError("init s3 client error");
}
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess()) {
return Status::IOError(
"failed to read from {}: {}, exception {}, error code {}, request id {}",
_path.native(), outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode(),
outcome.GetError().GetRequestId());
// auto outcome = client->GetObject(request);
// if (!outcome.IsSuccess()) {
// return Status::IOError(
// "failed to read from {}: {}, exception {}, error code {}, request id {}",
// _path.native(), outcome.GetError().GetMessage(),
// outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode(),
// outcome.GetError().GetRequestId());
// }
// *bytes_read = outcome.GetResult().GetContentLength();
// if (*bytes_read != bytes_req) {
// return Status::IOError("failed to read from {}(bytes read: {}, bytes req: {})",
// _path.native(), *bytes_read, bytes_req);
// SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);

int retry_count = 0;
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff

int total_sleep_time = 0;
while (retry_count <= max_retries) {
s3_file_reader_read_counter << 1;
auto outcome = client->GetObject(request);
_s3_stats.total_get_request_counter++;
if (!outcome.IsSuccess()) {
auto error = outcome.GetError();
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS) {
s3_file_reader_too_many_request_counter << 1;
retry_count++;
int wait_time = std::min(base_wait_time * (1 << retry_count),
max_wait_time); // Exponential backoff
std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
_s3_stats.too_many_request_err_counter++;
_s3_stats.too_many_request_sleep_time_ms += wait_time;
total_sleep_time += wait_time;
continue;
} else {
// Handle other errors
return Status::IOError(
"failed to read. msg: {}, exception: {}, error code: {}, request id: {}",
_path.native(), outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode(),
outcome.GetError().GetRequestId());
}
}
*bytes_read = outcome.GetResult().GetContentLength();
if (*bytes_read != bytes_req) {
return Status::InternalError("failed to read (bytes read: {}, bytes req: {})",
*bytes_read, bytes_req);
}
_s3_stats.total_bytes_read += bytes_req;
s3_bytes_read_total << bytes_req;
s3_bytes_per_read << bytes_req;
DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req);
if (retry_count > 0) {
LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping",
_path.native(), retry_count, total_sleep_time);
}
return Status::OK();
}
*bytes_read = outcome.GetResult().GetContentLength();
if (*bytes_read != bytes_req) {
return Status::IOError("failed to read from {}(bytes read: {}, bytes req: {})",
_path.native(), *bytes_read, bytes_req);
return Status::InternalError("failed to read from s3, exceeded maximum retries");
}

void S3FileReader::_collect_profile_before_close() {
if (_profile != nullptr) {
const char* s3_profile_name = "S3Profile";
ADD_TIMER(_profile, s3_profile_name);
RuntimeProfile::Counter* total_get_request_counter =
ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name);
RuntimeProfile::Counter* too_many_request_err_counter =
ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name);
RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER(
_profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name);
RuntimeProfile::Counter* total_bytes_read =
ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name);

COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter);
COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter);
COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms);
COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read);
}
s3_bytes_read_total << *bytes_read;
s3_file_reader_read_counter << 1;
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
return Status::OK();
}

} // namespace io
Expand Down
15 changes: 14 additions & 1 deletion be/src/io/fs/s3_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
#include "util/slice.h"

namespace doris {

class RuntimeProfile;

namespace io {
class IOContext;

class S3FileReader final : public FileReader {
public:
S3FileReader(Path path, size_t file_size, std::string key, std::string bucket,
std::shared_ptr<S3FileSystem> fs);
std::shared_ptr<S3FileSystem> fs, RuntimeProfile* profile);

~S3FileReader() override;

Expand All @@ -55,14 +58,24 @@ class S3FileReader final : public FileReader {
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;

void _collect_profile_before_close() override;

private:
struct S3Statistics {
int64_t total_get_request_counter = 0;
int64_t too_many_request_err_counter = 0;
int64_t too_many_request_sleep_time_ms = 0;
int64_t total_bytes_read = 0;
};
Path _path;
size_t _file_size;
std::shared_ptr<S3FileSystem> _fs;

std::string _bucket;
std::string _key;
std::atomic<bool> _closed = false;
RuntimeProfile* _profile = nullptr;
S3Statistics _s3_stats;
};

} // namespace io
Expand Down
13 changes: 8 additions & 5 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_reader.h"
#include "io/fs/s3_file_writer.h"
#include "util/runtime_profile.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"

Expand Down Expand Up @@ -119,14 +120,16 @@ Status S3FileSystem::set_conf(S3Conf s3_conf) {
return Status::OK();
}

Status S3FileSystem::create(S3Conf s3_conf, std::string id, std::shared_ptr<S3FileSystem>* fs) {
(*fs).reset(new S3FileSystem(std::move(s3_conf), std::move(id)));
Status S3FileSystem::create(S3Conf s3_conf, std::string id, RuntimeProfile* profile,
std::shared_ptr<S3FileSystem>* fs) {
(*fs).reset(new S3FileSystem(std::move(s3_conf), std::move(id), profile));
return (*fs)->connect();
}

S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id)
S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id, RuntimeProfile* profile)
: RemoteFileSystem(s3_conf.prefix, std::move(id), FileSystemType::S3),
_s3_conf(std::move(s3_conf)) {
_s3_conf(std::move(s3_conf)),
_profile(profile) {
// remove the first and last '/'
if (!_s3_conf.prefix.empty()) {
if (_s3_conf.prefix[0] == '/') {
Expand Down Expand Up @@ -168,7 +171,7 @@ Status S3FileSystem::open_file_internal(const FileDescription& fd, const Path& a
auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key;
*reader = std::make_shared<S3FileReader>(
std::move(fs_path), fsize, std::move(key), _s3_conf.bucket,
std::static_pointer_cast<S3FileSystem>(shared_from_this()));
std::static_pointer_cast<S3FileSystem>(shared_from_this()), _profile);
return Status::OK();
}

Expand Down
8 changes: 6 additions & 2 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class PooledThreadExecutor;
} // namespace Aws::Utils::Threading

namespace doris {
class RuntimeProfile;

namespace io {
struct FileInfo;

Expand All @@ -56,7 +58,8 @@ struct FileInfo;
// This class is thread-safe.(Except `set_xxx` method)
class S3FileSystem final : public RemoteFileSystem {
public:
static Status create(S3Conf s3_conf, std::string id, std::shared_ptr<S3FileSystem>* fs);
static Status create(S3Conf s3_conf, std::string id, RuntimeProfile* profile,
std::shared_ptr<S3FileSystem>* fs);
~S3FileSystem() override;
// Guarded by external lock.
Status set_conf(S3Conf s3_conf);
Expand Down Expand Up @@ -105,7 +108,7 @@ class S3FileSystem final : public RemoteFileSystem {
}

private:
S3FileSystem(S3Conf&& s3_conf, std::string&& id);
S3FileSystem(S3Conf&& s3_conf, std::string&& id, RuntimeProfile* profile);

template <typename AwsOutcome>
std::string error_msg(const std::string& key, const AwsOutcome& outcome) const;
Expand All @@ -122,6 +125,7 @@ class S3FileSystem final : public RemoteFileSystem {
mutable std::mutex _client_mu;
std::shared_ptr<Aws::S3::S3Client> _client;
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> _executor;
RuntimeProfile* _profile = nullptr;
};

} // namespace io
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l
RETURN_IF_ERROR(s3_uri.parse());
RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(_prop, s3_uri, &s3_conf));
std::shared_ptr<io::S3FileSystem> fs;
RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", &fs));
RETURN_IF_ERROR(io::S3FileSystem::create(std::move(s3_conf), "", nullptr, &fs));
_remote_fs = std::move(fs);
} else if (TStorageBackendType::type::HDFS == type) {
THdfsParams hdfs_params = parse_properties(_prop);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/runtime/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ Status VFileResultWriter::_delete_dir() {
std::shared_ptr<io::S3FileSystem> s3_fs = nullptr;
RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(
_file_opts->broker_properties, s3_uri, &s3_conf));
RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &s3_fs));
RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", nullptr, &s3_fs));
file_system = s3_fs;
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/test/io/cache/remote_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class RemoteFileCacheTest : public ::testing::Test {
// just use to create s3 filesystem, otherwise won't use cache
S3Conf s3_conf;
std::shared_ptr<io::S3FileSystem> fs;
Status st = io::S3FileSystem::create(std::move(s3_conf), resource_id, &fs);
Status st = io::S3FileSystem::create(std::move(s3_conf), resource_id, nullptr, &fs);
// io::S3FileSystem::create will call connect, which will fail because s3_conf is empty.
// but it does affect the following unit test
ASSERT_FALSE(st.ok()) << st;
Expand Down
2 changes: 1 addition & 1 deletion be/test/io/fs/remote_file_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
CHECK_STATUS_OK(s3_uri.parse());
CHECK_STATUS_OK(S3ClientFactory::convert_properties_to_s3_conf(s3_prop, s3_uri, &s3_conf));
std::shared_ptr<io::S3FileSystem> fs;
CHECK_STATUS_OK(io::S3FileSystem::create(std::move(s3_conf), "", &fs));
CHECK_STATUS_OK(io::S3FileSystem::create(std::move(s3_conf), "", nullptr, &fs));

// delete directory
io::Path delete_path = s3_location + "/tmp1";
Expand Down
6 changes: 3 additions & 3 deletions be/test/olap/remote_rowset_gc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class RemoteRowsetGcTest : public testing::Test {
s3_conf.bucket = config::test_s3_bucket;
s3_conf.prefix = "remote_rowset_gc_test";
std::shared_ptr<io::S3FileSystem> s3_fs;
ASSERT_TRUE(
io::S3FileSystem::create(std::move(s3_conf), std::to_string(kResourceId), &s3_fs)
.ok());
ASSERT_TRUE(io::S3FileSystem::create(std::move(s3_conf), std::to_string(kResourceId),
nullptr, &s3_fs)
.ok());
put_storage_resource(kResourceId, {s3_fs, 1});
auto storage_policy = std::make_shared<StoragePolicy>();
storage_policy->name = "TabletCooldownTest";
Expand Down
Loading