From 8292965686da88c24c2b20220a5f0571f8dfd7d7 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 18 Aug 2025 14:15:11 +0800 Subject: [PATCH 1/6] [fix](hdfs) remove cached file handle when read fails --- be/src/io/fs/hdfs_file_reader.cpp | 13 +++++++++++-- be/src/io/fs/hdfs_file_reader.h | 3 +++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index c12b998883a119..4b150039852693 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -115,8 +115,17 @@ Status HdfsFileReader::close() { return Status::OK(); } -#ifdef USE_HADOOP_HDFS Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx) { + auto st = read_at_impl_impl(offset, result, bytes_read, io_ctx); + if (!st.ok()) { + cache.remove_file_handle(_handle); + } + return st; +} + +#ifdef USE_HADOOP_HDFS +Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: {}", _path.native()); @@ -173,7 +182,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r #else // The hedged read only support hdfsPread(). // TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead() -Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, +Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: ", _path.native()); diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 8ccbe4ade8839a..91631a0f64af57 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -63,6 +63,9 @@ class HdfsFileReader final : public FileReader { void _collect_profile_before_close() override; + Status read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx); + private: #ifdef USE_HADOOP_HDFS struct HDFSProfile { From ba462daa27787d77b6469d0587398962256e676b Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 18 Aug 2025 14:20:25 +0800 Subject: [PATCH 2/6] fix --- be/src/io/fs/hdfs_file_reader.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 4b150039852693..11dbbe400af489 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -46,11 +46,13 @@ bvar::PerSecond> hdfs_read_througthput("hdfs_file_reader", &hdfs_bytes_read_total); namespace { +static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, + config::max_hdfs_file_handle_cache_time_sec); +// Delay remove from cache: do nothing here, let the cache eviction thread handle removal. Result get_file(const hdfsFS& fs, const Path& file, int64_t mtime, int64_t file_size) { - static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, - config::max_hdfs_file_handle_cache_time_sec); + bool cache_hit; FileHandleCache::Accessor accessor; RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false, @@ -117,7 +119,7 @@ Status HdfsFileReader::close() { Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) { - auto st = read_at_impl_impl(offset, result, bytes_read, io_ctx); + auto st = read_at_impl_impl(offset, result, bytes_read, io_ctx); if (!st.ok()) { cache.remove_file_handle(_handle); } @@ -126,7 +128,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r #ifdef USE_HADOOP_HDFS Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* /*io_ctx*/) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: {}", _path.native()); } @@ -183,7 +185,7 @@ Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* by // The hedged read only support hdfsPread(). // TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead() Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* /*io_ctx*/) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: ", _path.native()); } From 9c799cb90790efadc517a6790ef47cb1a007fc35 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 18 Aug 2025 14:22:53 +0800 Subject: [PATCH 3/6] fix --- be/src/io/fs/hdfs_file_reader.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 11dbbe400af489..b77459eb9d2ca4 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -52,7 +52,6 @@ static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, Result get_file(const hdfsFS& fs, const Path& file, int64_t mtime, int64_t file_size) { - bool cache_hit; FileHandleCache::Accessor accessor; RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false, @@ -119,7 +118,7 @@ Status HdfsFileReader::close() { Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) { - auto st = read_at_impl_impl(offset, result, bytes_read, io_ctx); + auto st = do_read_at_impl(offset, result, bytes_read, io_ctx); if (!st.ok()) { cache.remove_file_handle(_handle); } @@ -127,8 +126,8 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r } #ifdef USE_HADOOP_HDFS -Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { +Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* /*io_ctx*/) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: {}", _path.native()); } @@ -184,8 +183,8 @@ Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* by #else // The hedged read only support hdfsPread(). // TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead() -Status HdfsFileReader::read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { +Status HdfsFileReader::do_read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* /*io_ctx*/) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: ", _path.native()); } From 769b0d5a0caf5d5b89355638c992bf6cdc5cd7f4 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 18 Aug 2025 14:50:23 +0800 Subject: [PATCH 4/6] fix --- be/src/io/fs/hdfs_file_reader.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index b77459eb9d2ca4..6df7a826041bf9 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -48,7 +48,6 @@ bvar::PerSecond> hdfs_read_througthput("hdfs_file_reader", namespace { static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, config::max_hdfs_file_handle_cache_time_sec); -// Delay remove from cache: do nothing here, let the cache eviction thread handle removal. Result get_file(const hdfsFS& fs, const Path& file, int64_t mtime, int64_t file_size) { @@ -120,7 +119,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r const IOContext* io_ctx) { auto st = do_read_at_impl(offset, result, bytes_read, io_ctx); if (!st.ok()) { - cache.remove_file_handle(_handle); + _accessor.destroy(); } return st; } From 0ec7ca055d6f07d3e007564b00ea3349cb71053e Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 18 Aug 2025 15:04:50 +0800 Subject: [PATCH 5/6] fix --- be/src/io/fs/hdfs_file_reader.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 91631a0f64af57..8556eea0de6ac5 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -63,8 +63,8 @@ class HdfsFileReader final : public FileReader { void _collect_profile_before_close() override; - Status read_at_impl_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* io_ctx); + Status do_read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const IOContext* io_ctx); private: #ifdef USE_HADOOP_HDFS From f1850b32ce8961d03c51947662dbe6ab4c3c5fab Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 18 Aug 2025 22:50:39 +0800 Subject: [PATCH 6/6] fix --- be/src/io/fs/hdfs_file_reader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 6df7a826041bf9..cb8b71f67d7960 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -46,11 +46,11 @@ bvar::PerSecond> hdfs_read_througthput("hdfs_file_reader", &hdfs_bytes_read_total); namespace { -static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, - config::max_hdfs_file_handle_cache_time_sec); Result get_file(const hdfsFS& fs, const Path& file, int64_t mtime, int64_t file_size) { + static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16, + config::max_hdfs_file_handle_cache_time_sec); bool cache_hit; FileHandleCache::Accessor accessor; RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false,