Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,10 @@ Result<io::FileWriterPtr> FileFactory::create_file_writer(
}
case TFileType::FILE_HDFS: {
THdfsParams hdfs_params = parse_properties(properties);
io::HdfsHandler* handler;
std::shared_ptr<io::HdfsHandler> handler;
RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection(
hdfs_params, hdfs_params.fs_name, &handler));
auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options);
if (!res.has_value()) {
handler->dec_ref();
}
return res;
return io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options);
}
default:
return ResultError(
Expand Down Expand Up @@ -165,7 +161,7 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
});
}
case TFileType::FILE_HDFS: {
io::HdfsHandler* handler;
std::shared_ptr<io::HdfsHandler> handler;
// FIXME(plat1ko): Explain the difference between `system_properties.hdfs_params.fs_name`
// and `file_description.fs_name`, it's so confused.
const auto* fs_name = &file_description.fs_name;
Expand Down
12 changes: 1 addition & 11 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_na
}
}

HdfsFileSystem::~HdfsFileSystem() {
if (_fs_handle != nullptr) {
if (_fs_handle->from_cache) {
_fs_handle->dec_ref();
} else {
delete _fs_handle;
}
}
}
HdfsFileSystem::~HdfsFileSystem() = default;

Status HdfsFileSystem::init() {
RETURN_IF_ERROR(
Expand All @@ -107,13 +99,11 @@ Status HdfsFileSystem::init() {

Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
_fs_handle->inc_ref();
auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts);
if (res.has_value()) {
*writer = std::move(res).value();
return Status::OK();
} else {
_fs_handle->dec_ref();
return std::move(res).error();
}
}
Expand Down
4 changes: 1 addition & 3 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ class HdfsFileSystem final : public RemoteFileSystem {
RuntimeProfile* profile, std::string root_path);
const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here
std::string _fs_name;
// do not use std::shared_ptr or std::unique_ptr
// _fs_handle is managed by HdfsFileSystemCache
HdfsHandler* _fs_handle = nullptr;
std::shared_ptr<HdfsHandler> _fs_handle = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<HdfsHandler> _fs_handle = nullptr;
std::shared_ptr<HdfsHandler> _fs_handle;

可以不用显式写 = nullptr,智能指针默认初始化都是 nullptr

RuntimeProfile* _profile = nullptr;
};
} // namespace io
Expand Down
11 changes: 3 additions & 8 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_writt

static constexpr size_t MB = 1024 * 1024;

HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file,
HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file,
std::string fs_name, const FileWriterOptions* opts)
: _path(std::move(path)),
_hdfs_handler(handler),
_hdfs_handler(std::move(handler)),
_hdfs_file(hdfs_file),
_fs_name(std::move(fs_name)),
_sync_file_data(opts ? opts->sync_file_data : true),
Expand All @@ -70,11 +70,6 @@ HdfsFileWriter::~HdfsFileWriter() {
hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
}

if (_hdfs_handler->from_cache) {
_hdfs_handler->dec_ref();
} else {
delete _hdfs_handler;
}
hdfs_file_being_written << -1;
}

Expand Down Expand Up @@ -266,7 +261,7 @@ Status HdfsFileWriter::finalize() {
return Status::OK();
}

Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handler,
Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler,
const std::string& fs_name,
const FileWriterOptions* opts) {
auto path = convert_path(full_path, fs_name);
Expand Down
9 changes: 5 additions & 4 deletions be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ class HdfsFileWriter final : public FileWriter {
// - fs_name/path_to_file
// - /path_to_file
// TODO(plat1ko): Support related path for cloud mode
static Result<FileWriterPtr> create(Path path, HdfsHandler* handler, const std::string& fs_name,
static Result<FileWriterPtr> create(Path path, std::shared_ptr<HdfsHandler> handler,
const std::string& fs_name,
const FileWriterOptions* opts = nullptr);

HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name,
const FileWriterOptions* opts = nullptr);
HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file,
std::string fs_name, const FileWriterOptions* opts = nullptr);
~HdfsFileWriter() override;

Status close() override;
Expand All @@ -59,7 +60,7 @@ class HdfsFileWriter final : public FileWriter {
Status _append(std::string_view content);

Path _path;
HdfsHandler* _hdfs_handler = nullptr;
std::shared_ptr<HdfsHandler> _hdfs_handler = nullptr;
hdfsFile _hdfs_file = nullptr;
std::string _fs_name;
size_t _bytes_appended = 0;
Expand Down
20 changes: 10 additions & 10 deletions be/src/io/hdfs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync");
void HdfsHandlerCache::_clean_invalid() {
std::vector<uint64> removed_handle;
for (auto& item : _cache) {
if (item.second->invalid() && item.second->ref_cnt() == 0) {
if (item.second.use_count() == 1 && item.second->invalid()) {
removed_handle.emplace_back(item.first);
}
}
Expand All @@ -94,7 +94,7 @@ void HdfsHandlerCache::_clean_oldest() {
uint64_t oldest_time = ULONG_MAX;
uint64 oldest = 0;
for (auto& item : _cache) {
if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) {
if (item.second.use_count() == 1 && item.second->last_access_time() < oldest_time) {
oldest_time = item.second->last_access_time();
oldest = item.first;
}
Expand All @@ -103,16 +103,16 @@ void HdfsHandlerCache::_clean_oldest() {
}

Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const std::string& fs_name,
HdfsHandler** fs_handle) {
std::shared_ptr<HdfsHandler>* fs_handle) {
uint64 hash_code = hdfs_hash_code(hdfs_params);
{
std::lock_guard<std::mutex> l(_lock);
auto it = _cache.find(hash_code);
if (it != _cache.end()) {
HdfsHandler* handle = it->second.get();
std::shared_ptr<HdfsHandler> handle = it->second;
if (!handle->invalid()) {
handle->inc_ref();
*fs_handle = handle;
handle->update_last_access_time();
*fs_handle = std::move(handle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we still have to deal with inc_ref() even with shared_ptr support?
can we integrate the process (inc_ref, dec_ref) into shared_ptr ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for fs handle cache.

return Status::OK();
}
// fs handle is invalid, erase it.
Expand All @@ -129,12 +129,12 @@ Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const st
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsHandler> handle = std::make_unique<HdfsHandler>(hdfs_fs, true);
handle->inc_ref();
*fs_handle = handle.get();
auto handle = std::make_shared<HdfsHandler>(hdfs_fs, true);
handle->update_last_access_time();
*fs_handle = handle;
_cache[hash_code] = std::move(handle);
} else {
*fs_handle = new HdfsHandler(hdfs_fs, false);
*fs_handle = std::make_shared<HdfsHandler>(hdfs_fs, false);
}
}
return Status::OK();
Expand Down
23 changes: 8 additions & 15 deletions be/src/io/hdfs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@ class HdfsHandler {
HdfsHandler(hdfsFS fs, bool cached)
: hdfs_fs(fs),
from_cache(cached),
_ref_cnt(0),
_create_time(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count()),
_last_access_time(0),
_invalid(false) {}

~HdfsHandler() {
DCHECK(_ref_cnt == 0);
if (hdfs_fs != nullptr) {
// DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed"
// even if we create a new one
Expand All @@ -73,17 +71,14 @@ class HdfsHandler {

int64_t last_access_time() { return _last_access_time; }

void inc_ref() {
_ref_cnt++;
_last_access_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
void update_last_access_time() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'update_last_access_time' can be made const [readability-make-member-function-const]

Suggested change
void update_last_access_time() {
void update_last_access_time() const {

if (from_cache) {
_last_access_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}
}

void dec_ref() { _ref_cnt--; }

int ref_cnt() { return _ref_cnt; }

bool invalid() { return _invalid; }

void set_invalid() { _invalid = true; }
Expand All @@ -94,8 +89,6 @@ class HdfsHandler {
const bool from_cache;

private:
// the number of referenced client
std::atomic<int> _ref_cnt;
// For kerberos authentication, we need to save create time so that
// we can know if the kerberos ticket is expired.
std::atomic<uint64_t> _create_time;
Expand All @@ -118,13 +111,13 @@ class HdfsHandlerCache {

// This function is thread-safe
Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name,
HdfsHandler** fs_handle);
std::shared_ptr<HdfsHandler>* fs_handle);

private:
static constexpr int MAX_CACHE_HANDLE = 64;

std::mutex _lock;
std::unordered_map<uint64_t, std::unique_ptr<HdfsHandler>> _cache;
std::unordered_map<uint64_t, std::shared_ptr<HdfsHandler>> _cache;

HdfsHandlerCache() = default;

Expand Down