diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index e41671e493b384..1340a3078e28e4 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -71,11 +71,11 @@ class HdfsFileSystemCache { // This function is thread-safe Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsFileSystemHandle** fs_handle); + std::shared_ptr* fs_handle); private: std::mutex _lock; - std::unordered_map> _cache; + std::unordered_map> _cache; HdfsFileSystemCache() = default; @@ -148,15 +148,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string id, } } -HdfsFileSystem::~HdfsFileSystem() { - if (_fs_handle != nullptr) { - if (_fs_handle->from_cache) { - _fs_handle->dec_ref(); - } else { - delete _fs_handle; - } - } -} +HdfsFileSystem::~HdfsFileSystem() = default; Status HdfsFileSystem::connect_impl() { RETURN_IF_ERROR( @@ -384,10 +376,6 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_ return local_writer->close(); } -HdfsFileSystemHandle* HdfsFileSystem::get_handle() { - return _fs_handle; -} - // ************* HdfsFileSystemCache ****************** int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; @@ -406,7 +394,7 @@ Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, const std void HdfsFileSystemCache::_clean_invalid() { std::vector removed_handle; for (auto& item : _cache) { - if (item.second->invalid() && item.second->ref_cnt() == 0) { + if (item.second.use_count() == 1 && item.second->invalid()) { removed_handle.emplace_back(item.first); } } @@ -419,7 +407,7 @@ void HdfsFileSystemCache::_clean_oldest() { uint64_t oldest_time = ULONG_MAX; uint64 oldest = 0; for (auto& item : _cache) { - if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) { + if (item.second.use_count() == 1 && item.second->last_access_time() < oldest_time) { oldest_time = item.second->last_access_time(); oldest = item.first; } @@ -429,16 +417,16 @@ void HdfsFileSystemCache::_clean_oldest() { Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsFileSystemHandle** fs_handle) { + std::shared_ptr* fs_handle) { uint64 hash_code = _hdfs_hash_code(hdfs_params, fs_name); { std::lock_guard l(_lock); auto it = _cache.find(hash_code); if (it != _cache.end()) { - HdfsFileSystemHandle* handle = it->second.get(); + std::shared_ptr handle = it->second; if (!handle->invalid()) { - handle->inc_ref(); - *fs_handle = handle; + handle->update_last_access_time(); + *fs_handle = std::move(handle); return Status::OK(); } // fs handle is invalid, erase it. @@ -455,13 +443,12 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, _clean_oldest(); } if (_cache.size() < MAX_CACHE_HANDLE) { - std::unique_ptr handle = - std::make_unique(hdfs_fs, true); - handle->inc_ref(); - *fs_handle = handle.get(); + auto handle = std::make_shared(hdfs_fs, true); + handle->update_last_access_time(); + *fs_handle = handle; _cache[hash_code] = std::move(handle); } else { - *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); + *fs_handle = std::make_shared(hdfs_fs, false); } } return Status::OK(); diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index db854cafa9edd0..74d098004ab236 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -45,13 +45,11 @@ class HdfsFileSystemHandle { HdfsFileSystemHandle(hdfsFS fs, bool cached) : hdfs_fs(fs), from_cache(cached), - _ref_cnt(0), _create_time(_now()), _last_access_time(0), _invalid(false) {} ~HdfsFileSystemHandle() { - DCHECK(_ref_cnt == 0) << _ref_cnt; if (hdfs_fs != nullptr) { // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" // even if we create a new one @@ -62,18 +60,14 @@ class HdfsFileSystemHandle { int64_t last_access_time() { return _last_access_time; } - void inc_ref() { - _ref_cnt++; - _last_access_time = _now(); - } - - void dec_ref() { - _ref_cnt--; - _last_access_time = _now(); + void update_last_access_time() { + if (from_cache) { + _last_access_time = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } } - int ref_cnt() { return _ref_cnt; } - bool invalid() { return _invalid; } void set_invalid() { _invalid = true; } @@ -84,8 +78,6 @@ class HdfsFileSystemHandle { const bool from_cache; private: - // the number of referenced client - std::atomic _ref_cnt; // For kerberos authentication, we need to save create time so that // we can know if the kerberos ticket is expired. std::atomic _create_time; @@ -109,8 +101,6 @@ class HdfsFileSystem final : public RemoteFileSystem { ~HdfsFileSystem() override; - HdfsFileSystemHandle* get_handle(); - friend class HdfsFileHandleCache; protected: @@ -143,9 +133,7 @@ class HdfsFileSystem final : public RemoteFileSystem { RuntimeProfile* profile); const THdfsParams& _hdfs_params; std::string _fs_name; - // do not use std::shared_ptr or std::unique_ptr - // _fs_handle is managed by HdfsFileSystemCache - HdfsFileSystemHandle* _fs_handle = nullptr; + std::shared_ptr _fs_handle = nullptr; RuntimeProfile* _profile = nullptr; }; } // namespace io