diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 2ca32cbacaf352..5655b4392bf695 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -123,14 +123,10 @@ Result FileFactory::create_file_writer( } case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); - io::HdfsHandler* handler; + std::shared_ptr handler; RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection( hdfs_params, hdfs_params.fs_name, &handler)); - auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); - if (!res.has_value()) { - handler->dec_ref(); - } - return res; + return io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); } default: return ResultError( @@ -165,7 +161,7 @@ Result FileFactory::create_file_reader( }); } case TFileType::FILE_HDFS: { - io::HdfsHandler* handler; + std::shared_ptr handler; // FIXME(plat1ko): Explain the difference between `system_properties.hdfs_params.fs_name` // and `file_description.fs_name`, it's so confused. const auto* fs_name = &file_description.fs_name; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index f86b3ef588afb3..24976f11941e3a 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -86,15 +86,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_na } } -HdfsFileSystem::~HdfsFileSystem() { - if (_fs_handle != nullptr) { - if (_fs_handle->from_cache) { - _fs_handle->dec_ref(); - } else { - delete _fs_handle; - } - } -} +HdfsFileSystem::~HdfsFileSystem() = default; Status HdfsFileSystem::init() { RETURN_IF_ERROR( @@ -107,13 +99,11 @@ Status HdfsFileSystem::init() { Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { - _fs_handle->inc_ref(); auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts); if (res.has_value()) { *writer = std::move(res).value(); return Status::OK(); } else { - _fs_handle->dec_ref(); return std::move(res).error(); } } diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index 23ae65b0820ef4..f7c9ea223151a9 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -88,9 +88,7 @@ class HdfsFileSystem final : public RemoteFileSystem { RuntimeProfile* profile, std::string root_path); const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here std::string _fs_name; - // do not use std::shared_ptr or std::unique_ptr - // _fs_handle is managed by HdfsFileSystemCache - HdfsHandler* _fs_handle = nullptr; + std::shared_ptr _fs_handle = nullptr; RuntimeProfile* _profile = nullptr; }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index b47e4c3f2d2ebe..2444ab3d1f57da 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -45,10 +45,10 @@ bvar::Adder hdfs_file_being_written("hdfs_file_writer_file_being_writt static constexpr size_t MB = 1024 * 1024; -HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, +HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr handler, hdfsFile hdfs_file, std::string fs_name, const FileWriterOptions* opts) : _path(std::move(path)), - _hdfs_handler(handler), + _hdfs_handler(std::move(handler)), _hdfs_file(hdfs_file), _fs_name(std::move(fs_name)), _sync_file_data(opts ? opts->sync_file_data : true), @@ -70,11 +70,6 @@ HdfsFileWriter::~HdfsFileWriter() { hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); } - if (_hdfs_handler->from_cache) { - _hdfs_handler->dec_ref(); - } else { - delete _hdfs_handler; - } hdfs_file_being_written << -1; } @@ -266,7 +261,7 @@ Status HdfsFileWriter::finalize() { return Status::OK(); } -Result HdfsFileWriter::create(Path full_path, HdfsHandler* handler, +Result HdfsFileWriter::create(Path full_path, std::shared_ptr handler, const std::string& fs_name, const FileWriterOptions* opts) { auto path = convert_path(full_path, fs_name); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index e6aa623cada162..f6fa66c5dbed26 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -36,11 +36,12 @@ class HdfsFileWriter final : public FileWriter { // - fs_name/path_to_file // - /path_to_file // TODO(plat1ko): Support related path for cloud mode - static Result create(Path path, HdfsHandler* handler, const std::string& fs_name, + static Result create(Path path, std::shared_ptr handler, + const std::string& fs_name, const FileWriterOptions* opts = nullptr); - HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name, - const FileWriterOptions* opts = nullptr); + HdfsFileWriter(Path path, std::shared_ptr handler, hdfsFile hdfs_file, + std::string fs_name, const FileWriterOptions* opts = nullptr); ~HdfsFileWriter() override; Status close() override; @@ -59,7 +60,7 @@ class HdfsFileWriter final : public FileWriter { Status _append(std::string_view content); Path _path; - HdfsHandler* _hdfs_handler = nullptr; + std::shared_ptr _hdfs_handler = nullptr; hdfsFile _hdfs_file = nullptr; std::string _fs_name; size_t _bytes_appended = 0; diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index 0ae5d2f371b540..a9563a821395a7 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -81,7 +81,7 @@ bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); void HdfsHandlerCache::_clean_invalid() { std::vector removed_handle; for (auto& item : _cache) { - if (item.second->invalid() && item.second->ref_cnt() == 0) { + if (item.second.use_count() == 1 && item.second->invalid()) { removed_handle.emplace_back(item.first); } } @@ -94,7 +94,7 @@ void HdfsHandlerCache::_clean_oldest() { uint64_t oldest_time = ULONG_MAX; uint64 oldest = 0; for (auto& item : _cache) { - if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) { + if (item.second.use_count() == 1 && item.second->last_access_time() < oldest_time) { oldest_time = item.second->last_access_time(); oldest = item.first; } @@ -103,16 +103,16 @@ void HdfsHandlerCache::_clean_oldest() { } Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsHandler** fs_handle) { + std::shared_ptr* fs_handle) { uint64 hash_code = hdfs_hash_code(hdfs_params); { std::lock_guard l(_lock); auto it = _cache.find(hash_code); if (it != _cache.end()) { - HdfsHandler* handle = it->second.get(); + std::shared_ptr handle = it->second; if (!handle->invalid()) { - handle->inc_ref(); - *fs_handle = handle; + handle->update_last_access_time(); + *fs_handle = std::move(handle); return Status::OK(); } // fs handle is invalid, erase it. @@ -129,12 +129,12 @@ Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const st _clean_oldest(); } if (_cache.size() < MAX_CACHE_HANDLE) { - std::unique_ptr handle = std::make_unique(hdfs_fs, true); - handle->inc_ref(); - *fs_handle = handle.get(); + auto handle = std::make_shared(hdfs_fs, true); + handle->update_last_access_time(); + *fs_handle = handle; _cache[hash_code] = std::move(handle); } else { - *fs_handle = new HdfsHandler(hdfs_fs, false); + *fs_handle = std::make_shared(hdfs_fs, false); } } return Status::OK(); diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index f450063c7dc149..c8d25d1f30aa4f 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -54,7 +54,6 @@ class HdfsHandler { HdfsHandler(hdfsFS fs, bool cached) : hdfs_fs(fs), from_cache(cached), - _ref_cnt(0), _create_time(std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count()), @@ -62,7 +61,6 @@ class HdfsHandler { _invalid(false) {} ~HdfsHandler() { - DCHECK(_ref_cnt == 0); if (hdfs_fs != nullptr) { // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" // even if we create a new one @@ -73,17 +71,14 @@ class HdfsHandler { int64_t last_access_time() { return _last_access_time; } - void inc_ref() { - _ref_cnt++; - _last_access_time = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + void update_last_access_time() { + if (from_cache) { + _last_access_time = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } } - void dec_ref() { _ref_cnt--; } - - int ref_cnt() { return _ref_cnt; } - bool invalid() { return _invalid; } void set_invalid() { _invalid = true; } @@ -94,8 +89,6 @@ class HdfsHandler { const bool from_cache; private: - // the number of referenced client - std::atomic _ref_cnt; // For kerberos authentication, we need to save create time so that // we can know if the kerberos ticket is expired. std::atomic _create_time; @@ -118,13 +111,13 @@ class HdfsHandlerCache { // This function is thread-safe Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsHandler** fs_handle); + std::shared_ptr* fs_handle); private: static constexpr int MAX_CACHE_HANDLE = 64; std::mutex _lock; - std::unordered_map> _cache; + std::unordered_map> _cache; HdfsHandlerCache() = default;