From b2b3027b5d4e28307be23facb767dbb959967c4c Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Mon, 22 Apr 2024 14:38:03 +0800 Subject: [PATCH 1/2] [Fix](hdfs-writer) Fix hdfs file writer core with `check failed: _ref_cnt == 0` in dctor of `HdfsFileWriter`. --- be/src/io/file_factory.cpp | 10 +++------- be/src/io/fs/hdfs_file_system.cpp | 4 ---- be/src/io/fs/hdfs_file_system.h | 4 +--- be/src/io/fs/hdfs_file_writer.cpp | 6 ++---- be/src/io/fs/hdfs_file_writer.h | 9 +++++---- be/src/io/hdfs_util.cpp | 12 ++++++------ be/src/io/hdfs_util.h | 4 ++-- 7 files changed, 19 insertions(+), 30 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 2ca32cbacaf352..5655b4392bf695 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -123,14 +123,10 @@ Result FileFactory::create_file_writer( } case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); - io::HdfsHandler* handler; + std::shared_ptr handler; RETURN_IF_ERROR_RESULT(io::HdfsHandlerCache::instance()->get_connection( hdfs_params, hdfs_params.fs_name, &handler)); - auto res = io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); - if (!res.has_value()) { - handler->dec_ref(); - } - return res; + return io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options); } default: return ResultError( @@ -165,7 +161,7 @@ Result FileFactory::create_file_reader( }); } case TFileType::FILE_HDFS: { - io::HdfsHandler* handler; + std::shared_ptr handler; // FIXME(plat1ko): Explain the difference between `system_properties.hdfs_params.fs_name` // and `file_description.fs_name`, it's so confused. const auto* fs_name = &file_description.fs_name; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index f86b3ef588afb3..9bd8fcefcfb050 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -90,8 +90,6 @@ HdfsFileSystem::~HdfsFileSystem() { if (_fs_handle != nullptr) { if (_fs_handle->from_cache) { _fs_handle->dec_ref(); - } else { - delete _fs_handle; } } } @@ -107,13 +105,11 @@ Status HdfsFileSystem::init() { Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { - _fs_handle->inc_ref(); auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts); if (res.has_value()) { *writer = std::move(res).value(); return Status::OK(); } else { - _fs_handle->dec_ref(); return std::move(res).error(); } } diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index 23ae65b0820ef4..f7c9ea223151a9 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -88,9 +88,7 @@ class HdfsFileSystem final : public RemoteFileSystem { RuntimeProfile* profile, std::string root_path); const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here std::string _fs_name; - // do not use std::shared_ptr or std::unique_ptr - // _fs_handle is managed by HdfsFileSystemCache - HdfsHandler* _fs_handle = nullptr; + std::shared_ptr _fs_handle = nullptr; RuntimeProfile* _profile = nullptr; }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index b47e4c3f2d2ebe..56c5509f7b4be8 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -45,7 +45,7 @@ bvar::Adder hdfs_file_being_written("hdfs_file_writer_file_being_writt static constexpr size_t MB = 1024 * 1024; -HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, +HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr handler, hdfsFile hdfs_file, std::string fs_name, const FileWriterOptions* opts) : _path(std::move(path)), _hdfs_handler(handler), @@ -72,8 +72,6 @@ HdfsFileWriter::~HdfsFileWriter() { if (_hdfs_handler->from_cache) { _hdfs_handler->dec_ref(); - } else { - delete _hdfs_handler; } hdfs_file_being_written << -1; } @@ -266,7 +264,7 @@ Status HdfsFileWriter::finalize() { return Status::OK(); } -Result HdfsFileWriter::create(Path full_path, HdfsHandler* handler, +Result HdfsFileWriter::create(Path full_path, std::shared_ptr handler, const std::string& fs_name, const FileWriterOptions* opts) { auto path = convert_path(full_path, fs_name); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index e6aa623cada162..f6fa66c5dbed26 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -36,11 +36,12 @@ class HdfsFileWriter final : public FileWriter { // - fs_name/path_to_file // - /path_to_file // TODO(plat1ko): Support related path for cloud mode - static Result create(Path path, HdfsHandler* handler, const std::string& fs_name, + static Result create(Path path, std::shared_ptr handler, + const std::string& fs_name, const FileWriterOptions* opts = nullptr); - HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name, - const FileWriterOptions* opts = nullptr); + HdfsFileWriter(Path path, std::shared_ptr handler, hdfsFile hdfs_file, + std::string fs_name, const FileWriterOptions* opts = nullptr); ~HdfsFileWriter() override; Status close() override; @@ -59,7 +60,7 @@ class HdfsFileWriter final : public FileWriter { Status _append(std::string_view content); Path _path; - HdfsHandler* _hdfs_handler = nullptr; + std::shared_ptr _hdfs_handler = nullptr; hdfsFile _hdfs_file = nullptr; std::string _fs_name; size_t _bytes_appended = 0; diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index 0ae5d2f371b540..157cf610700ecc 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -103,16 +103,16 @@ void HdfsHandlerCache::_clean_oldest() { } Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsHandler** fs_handle) { + std::shared_ptr* fs_handle) { uint64 hash_code = hdfs_hash_code(hdfs_params); { std::lock_guard l(_lock); auto it = _cache.find(hash_code); if (it != _cache.end()) { - HdfsHandler* handle = it->second.get(); + std::shared_ptr handle = it->second; if (!handle->invalid()) { handle->inc_ref(); - *fs_handle = handle; + *fs_handle = std::move(handle); return Status::OK(); } // fs handle is invalid, erase it. @@ -129,12 +129,12 @@ Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const st _clean_oldest(); } if (_cache.size() < MAX_CACHE_HANDLE) { - std::unique_ptr handle = std::make_unique(hdfs_fs, true); + std::shared_ptr handle = std::make_shared(hdfs_fs, true); handle->inc_ref(); - *fs_handle = handle.get(); + *fs_handle = handle; _cache[hash_code] = std::move(handle); } else { - *fs_handle = new HdfsHandler(hdfs_fs, false); + *fs_handle = std::make_shared(hdfs_fs, false); } } return Status::OK(); diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index f450063c7dc149..6843caa5d55a06 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -118,13 +118,13 @@ class HdfsHandlerCache { // This function is thread-safe Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, - HdfsHandler** fs_handle); + std::shared_ptr* fs_handle); private: static constexpr int MAX_CACHE_HANDLE = 64; std::mutex _lock; - std::unordered_map> _cache; + std::unordered_map> _cache; HdfsHandlerCache() = default; From c95dd699c71fd756e1728ad9ce523a637b17ad18 Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Tue, 23 Apr 2024 12:29:07 +0800 Subject: [PATCH 2/2] Use shared_ptr in the cache and fix comments. --- be/src/io/fs/hdfs_file_system.cpp | 8 +------- be/src/io/fs/hdfs_file_writer.cpp | 5 +---- be/src/io/hdfs_util.cpp | 10 +++++----- be/src/io/hdfs_util.h | 19 ++++++------------- 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 9bd8fcefcfb050..24976f11941e3a 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -86,13 +86,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_na } } -HdfsFileSystem::~HdfsFileSystem() { - if (_fs_handle != nullptr) { - if (_fs_handle->from_cache) { - _fs_handle->dec_ref(); - } - } -} +HdfsFileSystem::~HdfsFileSystem() = default; Status HdfsFileSystem::init() { RETURN_IF_ERROR( diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 56c5509f7b4be8..2444ab3d1f57da 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -48,7 +48,7 @@ static constexpr size_t MB = 1024 * 1024; HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr handler, hdfsFile hdfs_file, std::string fs_name, const FileWriterOptions* opts) : _path(std::move(path)), - _hdfs_handler(handler), + _hdfs_handler(std::move(handler)), _hdfs_file(hdfs_file), _fs_name(std::move(fs_name)), _sync_file_data(opts ? opts->sync_file_data : true), @@ -70,9 +70,6 @@ HdfsFileWriter::~HdfsFileWriter() { hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); } - if (_hdfs_handler->from_cache) { - _hdfs_handler->dec_ref(); - } hdfs_file_being_written << -1; } diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index 157cf610700ecc..a9563a821395a7 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -81,7 +81,7 @@ bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); void HdfsHandlerCache::_clean_invalid() { std::vector removed_handle; for (auto& item : _cache) { - if (item.second->invalid() && item.second->ref_cnt() == 0) { + if (item.second.use_count() == 1 && item.second->invalid()) { removed_handle.emplace_back(item.first); } } @@ -94,7 +94,7 @@ void HdfsHandlerCache::_clean_oldest() { uint64_t oldest_time = ULONG_MAX; uint64 oldest = 0; for (auto& item : _cache) { - if (item.second->ref_cnt() == 0 && item.second->last_access_time() < oldest_time) { + if (item.second.use_count() == 1 && item.second->last_access_time() < oldest_time) { oldest_time = item.second->last_access_time(); oldest = item.first; } @@ -111,7 +111,7 @@ Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const st if (it != _cache.end()) { std::shared_ptr handle = it->second; if (!handle->invalid()) { - handle->inc_ref(); + handle->update_last_access_time(); *fs_handle = std::move(handle); return Status::OK(); } @@ -129,8 +129,8 @@ Status HdfsHandlerCache::get_connection(const THdfsParams& hdfs_params, const st _clean_oldest(); } if (_cache.size() < MAX_CACHE_HANDLE) { - std::shared_ptr handle = std::make_shared(hdfs_fs, true); - handle->inc_ref(); + auto handle = std::make_shared(hdfs_fs, true); + handle->update_last_access_time(); *fs_handle = handle; _cache[hash_code] = std::move(handle); } else { diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index 6843caa5d55a06..c8d25d1f30aa4f 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -54,7 +54,6 @@ class HdfsHandler { HdfsHandler(hdfsFS fs, bool cached) : hdfs_fs(fs), from_cache(cached), - _ref_cnt(0), _create_time(std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count()), @@ -62,7 +61,6 @@ class HdfsHandler { _invalid(false) {} ~HdfsHandler() { - DCHECK(_ref_cnt == 0); if (hdfs_fs != nullptr) { // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" // even if we create a new one @@ -73,17 +71,14 @@ class HdfsHandler { int64_t last_access_time() { return _last_access_time; } - void inc_ref() { - _ref_cnt++; - _last_access_time = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + void update_last_access_time() { + if (from_cache) { + _last_access_time = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + } } - void dec_ref() { _ref_cnt--; } - - int ref_cnt() { return _ref_cnt; } - bool invalid() { return _invalid; } void set_invalid() { _invalid = true; } @@ -94,8 +89,6 @@ class HdfsHandler { const bool from_cache; private: - // the number of referenced client - std::atomic _ref_cnt; // For kerberos authentication, we need to save create time so that // we can know if the kerberos ticket is expired. std::atomic _create_time;