From 1a8527cb19aae4646aa4f88364dea8d1a1be3d12 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 6 Mar 2025 23:46:25 +0800 Subject: [PATCH 1/7] [fix](cloud) retry read_at when corruption using file cache Signed-off-by: zhengyu --- be/src/cloud/injection_point_action.cpp | 8 ++++ be/src/olap/rowset/segment_v2/page_io.cpp | 51 +++++++++++++++++++++++ be/src/olap/rowset/segment_v2/page_io.h | 5 +++ 3 files changed, 64 insertions(+) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index bc6676313c1717..f3d1e89de25265 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -133,6 +133,14 @@ void register_suites() { *arg0 = Status::Corruption("test_file_segment_cache_corruption injection error"); }); }); + suite_map.emplace("test_page_io_checksum_mismatch", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("PageIO::read_and_decompress_page_:checksum_mismatch", [](auto&& args) { + LOG(INFO) << "injection PageIO::read_and_decompress_page_:checksum_mismatch"; + auto* arg0 = try_any_cast(args[0]); + *arg0 = 0; + }); + }); // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn' suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] { auto* sp = SyncPoint::get_instance(); diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 07d5656ee8a44b..365a26d0f33cff 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -27,8 +27,12 @@ #include #include +#include "cloud/config.h" #include "common/logging.h" +#include "cpp/sync_point.h" #include "gutil/strings/substitute.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "olap/olap_common.h" @@ -159,6 +163,8 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle if (opts.verify_checksum) { uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4); + TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page_:checksum_mismatch", + &actual); if (expect != actual) { return Status::Corruption( "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect, @@ -229,5 +235,50 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle return Status::OK(); } +io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { + std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 + return io::BlockFileCache::hash(base); +} + +std::string file_cache_key_str(const std::string& seg_path) { + return file_cache_key_from_path(seg_path).to_string(); +} + +Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOptions& opts, + PageHandle* handle, Slice* body, + PageFooterPB* footer) { + // First try to read with file cache + Status st = read_and_decompress_page(opts, handle, body, footer); + if (!st.is() || !config::is_cloud_mode()) { + return st; + } + + // If we get CORRUPTION error and using file cache, clear cache and retry + LOG(WARNING) << "Bad page may be read from file cache, try to read remote source directly, " + << "file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset; + + // Remove cache if exists + const std::string path = opts.file_reader->path().string(); + auto file_key = file_cache_key_from_path(path); + auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); + if (file_cache) { + file_cache->remove_if_cached(file_key); + } + + // Retry with file cache + st = read_and_decompress_page(opts, handle, body, footer); + if (st.is()) { + LOG(WARNING) << "Corruption again to read remote source file during retry, " + << "file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset; + } + // TODO(zhengyu): If still get CORRUPTION error, try to read without cache + // but it is hard to implement in PageIO level: file_reader is already + // opened as CachedRemoteFileReader and it will bring so much complexity + // if we reopen it as RemoteFileReader. So currenty, we just let it fall + return st; +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index b23af4b0b350e5..9f5baf147794e0 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -130,6 +130,11 @@ class PageIO { { return read_and_decompress_page_(opts, handle, body, footer); }); } + // deal with CORRUPTION when using file cache, retry from remote + static Status read_and_decompress_page_with_file_cache_retry(const PageReadOptions& opts, + PageHandle* handle, Slice* body, + PageFooterPB* footer); + private: // An internal method that not deal with exception. static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, From c41c27ae4e7ca1fced9dbc5ee11fbb1ebc4597ca Mon Sep 17 00:00:00 2001 From: zhengyu Date: Fri, 7 Mar 2025 01:50:26 +0800 Subject: [PATCH 2/7] enhancement Signed-off-by: zhengyu --- be/src/cloud/injection_point_action.cpp | 24 ++++++-- .../segment_v2/indexed_column_reader.cpp | 2 +- .../rowset/segment_v2/ordinal_page_index.cpp | 3 +- be/src/olap/rowset/segment_v2/page_io.cpp | 59 ++++++++++++------- be/src/olap/rowset/segment_v2/page_io.h | 25 +++++++- be/src/olap/rowset/segment_v2/segment.cpp | 13 +--- 6 files changed, 86 insertions(+), 40 deletions(-) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index f3d1e89de25265..daaaf0ce1c8c1a 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -27,7 +27,9 @@ #include "http/http_channel.h" #include "http/http_request.h" #include "http/http_status.h" +#include "io/cache/cached_remote_file_reader.h" #include "olap/rowset/rowset.h" +#include "olap/rowset/segment_v2/page_io.h" #include "util/stack_util.h" namespace doris { @@ -133,12 +135,24 @@ void register_suites() { *arg0 = Status::Corruption("test_file_segment_cache_corruption injection error"); }); }); - suite_map.emplace("test_page_io_checksum_mismatch", [] { + // curl "be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure" + suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] { auto* sp = SyncPoint::get_instance(); - sp->set_call_back("PageIO::read_and_decompress_page_:checksum_mismatch", [](auto&& args) { - LOG(INFO) << "injection PageIO::read_and_decompress_page_:checksum_mismatch"; - auto* arg0 = try_any_cast(args[0]); - *arg0 = 0; + sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", [](auto&& args) { + LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj"; + if (auto ctx = std::any_cast(args[0])) { + uint32_t* crc = ctx->crc; + segment_v2::PageReadOptions* opts = ctx->opts; + auto cached_file_reader = + dynamic_cast(opts->file_reader); + if (cached_file_reader == nullptr) { + return; // if not cachedreader, then do nothing + } else { + memset(crc, 0, 32); + } + } else { + std::cerr << "Failed to cast std::any to InjectionContext*" << std::endl; + } }); }); // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn' diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 3f582293ee4d7f..a6239f34c012d2 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -139,7 +139,7 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; } - auto st = PageIO::read_and_decompress_page(opts, handle, body, footer); + auto st = PageIO::read_and_decompress_page_with_file_cache_retry(opts, handle, body, footer); g_index_reader_compressed_bytes << pp.size; g_index_reader_bytes << footer->uncompressed_size(); g_index_reader_pages << 1; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 4995e779892646..35922a900e22cd 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -108,7 +108,8 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, PageHandle page_handle; Slice body; PageFooterPB footer; - RETURN_IF_ERROR(PageIO::read_and_decompress_page(opts, &page_handle, &body, &footer)); + RETURN_IF_ERROR(PageIO::read_and_decompress_page_with_file_cache_retry(opts, &page_handle, + &body, &footer)); // parse and save all (ordinal, pp) from index page IndexPageReader reader; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 365a26d0f33cff..4a3dbec93454c7 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -33,6 +33,7 @@ #include "gutil/strings/substitute.h" #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_factory.h" +#include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "olap/olap_common.h" @@ -115,6 +116,15 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector& body return Status::OK(); } +io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { + std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 + return io::BlockFileCache::hash(base); +} + +std::string file_cache_key_str(const std::string& seg_path) { + return file_cache_key_from_path(seg_path).to_string(); +} + Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, Slice* body, PageFooterPB* footer) { opts.sanity_check(); @@ -163,8 +173,9 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle if (opts.verify_checksum) { uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4); - TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page_:checksum_mismatch", - &actual); + InjectionContext ctx = {&actual, const_cast(&opts)}; + (void)ctx; + TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx); if (expect != actual) { return Status::Corruption( "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect, @@ -235,15 +246,6 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle return Status::OK(); } -io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { - std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 - return io::BlockFileCache::hash(base); -} - -std::string file_cache_key_str(const std::string& seg_path) { - return file_cache_key_from_path(seg_path).to_string(); -} - Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOptions& opts, PageHandle* handle, Slice* body, PageFooterPB* footer) { @@ -253,9 +255,15 @@ Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOpti return st; } + auto* cached_file_reader = dynamic_cast(opts.file_reader); + if (cached_file_reader == nullptr) { + return st; + } + // If we get CORRUPTION error and using file cache, clear cache and retry - LOG(WARNING) << "Bad page may be read from file cache, try to read remote source directly, " - << "file path: " << opts.file_reader->path().native() + LOG(WARNING) << "Bad page may be read from file cache, need retry." + << " error msg: " << st.msg() + << " file path: " << opts.file_reader->path().native() << " offset: " << opts.page_pointer.offset; // Remove cache if exists @@ -268,15 +276,24 @@ Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOpti // Retry with file cache st = read_and_decompress_page(opts, handle, body, footer); - if (st.is()) { - LOG(WARNING) << "Corruption again to read remote source file during retry, " - << "file path: " << opts.file_reader->path().native() - << " offset: " << opts.page_pointer.offset; + if (!st.is()) { + return st; + } + + LOG(WARNING) << "Corruption again with retry downloading cache," + << " error msg: " << st.msg() + << " file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset; + + PageReadOptions new_opts = opts; + new_opts.file_reader = cached_file_reader->get_remote_reader(); + st = read_and_decompress_page(new_opts, handle, body, footer); + if (!st.ok()) { + LOG(WARNING) << "Corruption again with retry read directly from remote," + << " error msg: " << st.msg() + << " file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset << " Give up."; } - // TODO(zhengyu): If still get CORRUPTION error, try to read without cache - // but it is hard to implement in PageIO level: file_reader is already - // opened as CachedRemoteFileReader and it will bring so much complexity - // if we reopen it as RemoteFileReader. So currenty, we just let it fall return st; } diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 9f5baf147794e0..4376737e9e72cf 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -23,6 +23,7 @@ #include "common/logging.h" #include "common/status.h" +#include "io/cache/block_file_cache.h" #include "io/io_common.h" #include "olap/rowset/segment_v2/page_pointer.h" #include "util/slice.h" @@ -41,6 +42,9 @@ namespace segment_v2 { class EncodingInfo; class PageHandle; +io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path); +std::string file_cache_key_str(const std::string& seg_path); + struct PageReadOptions { // whether to verify page checksum bool verify_checksum = true; @@ -66,12 +70,31 @@ struct PageReadOptions { const EncodingInfo* encoding_info = nullptr; - const io::IOContext& io_ctx; + io::IOContext& io_ctx; void sanity_check() const { CHECK_NOTNULL(file_reader); CHECK_NOTNULL(stats); } + PageReadOptions() = delete; + + PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) { + file_reader = old.file_reader; + page_pointer = old.page_pointer; + codec = old.codec; + stats = old.stats; + verify_checksum = old.verify_checksum; + use_page_cache = old.use_page_cache; + kept_in_memory = old.kept_in_memory; + type = old.type; + encoding_info = old.encoding_info; + pre_decode = old.pre_decode; + } +}; + +struct InjectionContext { + uint32_t* crc; + PageReadOptions* opts; }; inline std::ostream& operator<<(std::ostream& os, const PageReadOptions& opt) { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index d51e07c9087842..9f22510d870af1 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -78,15 +78,6 @@ namespace doris::segment_v2 { class InvertedIndexIterator; -io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { - std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 - return io::BlockFileCache::hash(base); -} - -std::string file_cache_key_str(const std::string& seg_path) { - return file_cache_key_from_path(seg_path).to_string(); -} - Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id, uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, const io::FileReaderOptions& reader_options, std::shared_ptr* output, @@ -533,8 +524,8 @@ Status Segment::load_index(OlapReaderStatistics* stats) { }; Slice body; PageFooterPB footer; - RETURN_IF_ERROR( - PageIO::read_and_decompress_page(opts, &_sk_index_handle, &body, &footer)); + RETURN_IF_ERROR(PageIO::read_and_decompress_page_with_file_cache_retry( + opts, &_sk_index_handle, &body, &footer)); DCHECK_EQ(footer.type(), SHORT_KEY_PAGE); DCHECK(footer.has_short_key_page_footer()); From afc3b484be68e3ed509e57352c625ebccb58870b Mon Sep 17 00:00:00 2001 From: zhengyu Date: Fri, 7 Mar 2025 02:43:03 +0800 Subject: [PATCH 3/7] minor fix Signed-off-by: zhengyu --- .../olap/rowset/segment_v2/column_reader.cpp | 25 +++++++++---------- .../segment_v2/indexed_column_reader.cpp | 25 +++++++++---------- .../rowset/segment_v2/ordinal_page_index.cpp | 24 ++++++++---------- be/src/olap/rowset/segment_v2/page_io.h | 4 +-- be/src/olap/rowset/segment_v2/segment.cpp | 21 ++++++++-------- 5 files changed, 47 insertions(+), 52 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 9948e7fd8cf472..ffe4d31f2825f4 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -355,21 +355,20 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag PageHandle* handle, Slice* page_body, PageFooterPB* footer, BlockCompressionCodec* codec) const { iter_opts.sanity_check(); - PageReadOptions opts { - .verify_checksum = _opts.verify_checksum, - .use_page_cache = iter_opts.use_page_cache, - .kept_in_memory = _opts.kept_in_memory, - .type = iter_opts.type, - .file_reader = iter_opts.file_reader, - .page_pointer = pp, - .codec = codec, - .stats = iter_opts.stats, - .encoding_info = _encoding_info, - .io_ctx = iter_opts.io_ctx, - }; + PageReadOptions opts(iter_opts.io_ctx); + opts.verify_checksum = _opts.verify_checksum; + opts.use_page_cache = iter_opts.use_page_cache; + opts.kept_in_memory = _opts.kept_in_memory; + opts.type = iter_opts.type; + opts.file_reader = iter_opts.file_reader; + opts.page_pointer = pp; + opts.codec = codec; + opts.stats = iter_opts.stats; + opts.encoding_info = _encoding_info; + // index page should not pre decode if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false; - return PageIO::read_and_decompress_page(opts, handle, page_body, footer); + return PageIO::read_and_decompress_page_with_file_cache_retry(opts, handle, page_body, footer); } Status ColumnReader::get_row_ranges_by_zone_map( diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index a6239f34c012d2..bcda3fbadf5839 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -123,19 +123,18 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, OlapReaderStatistics* stats) const { OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; - PageReadOptions opts { - .use_page_cache = _use_page_cache, - .kept_in_memory = _kept_in_memory, - .pre_decode = pre_decode, - .type = type, - .file_reader = _file_reader.get(), - .page_pointer = pp, - .codec = codec, - .stats = stats_ptr, - .encoding_info = _encoding_info, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, - }; + PageReadOptions opts(io::IOContext {.is_index_data = true, + .file_cache_stats = &stats_ptr->file_cache_stats}); + opts.use_page_cache = _use_page_cache; + opts.kept_in_memory = _kept_in_memory; + opts.pre_decode = pre_decode; + opts.type = type; + opts.file_reader = _file_reader.get(); + opts.page_pointer = pp; + opts.codec = codec; + opts.stats = stats_ptr; + opts.encoding_info = _encoding_info; + if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; } diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 35922a900e22cd..e66c4b2604cd38 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -91,19 +91,17 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, // need to read index page OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; - PageReadOptions opts { - .use_page_cache = use_page_cache, - .kept_in_memory = kept_in_memory, - .type = INDEX_PAGE, - .file_reader = _file_reader.get(), - .page_pointer = PagePointer(index_meta->root_page().root_page()), - // ordinal index page uses NO_COMPRESSION right now - .codec = nullptr, - .stats = stats_ptr, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, - }; - + PageReadOptions opts(io::IOContext {.is_index_data = true, + .file_cache_stats = &stats_ptr->file_cache_stats}); + opts.use_page_cache = use_page_cache; + opts.kept_in_memory = kept_in_memory; + opts.type = INDEX_PAGE; + opts.file_reader = _file_reader.get(); + opts.page_pointer = PagePointer(index_meta->root_page().root_page()); + // ordinal index page uses NO_COMPRESSION right now + opts.codec = nullptr; + opts.stats = stats_ptr; + // read index page PageHandle page_handle; Slice body; diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 4376737e9e72cf..2c6574b8503844 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -70,13 +70,13 @@ struct PageReadOptions { const EncodingInfo* encoding_info = nullptr; - io::IOContext& io_ctx; + const io::IOContext& io_ctx; void sanity_check() const { CHECK_NOTNULL(file_reader); CHECK_NOTNULL(stats); } - PageReadOptions() = delete; + PageReadOptions(const io::IOContext& ioctx):io_ctx(ioctx) {} PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) { file_reader = old.file_reader; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 9f22510d870af1..0a9f65adbb8363 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -511,17 +511,16 @@ Status Segment::load_index(OlapReaderStatistics* stats) { // read and parse short key index page OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; - PageReadOptions opts { - .use_page_cache = true, - .type = INDEX_PAGE, - .file_reader = _file_reader.get(), - .page_pointer = PagePointer(_sk_index_page), - // short key index page uses NO_COMPRESSION for now - .codec = nullptr, - .stats = &tmp_stats, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, - }; + PageReadOptions opts(io::IOContext {.is_index_data = true, + .file_cache_stats = &stats_ptr->file_cache_stats}); + opts.use_page_cache = true; + opts.type = INDEX_PAGE; + opts.file_reader = _file_reader.get(); + opts.page_pointer = PagePointer(_sk_index_page); + // short key index page uses NO_COMPRESSION for now + opts.codec = nullptr; + opts.stats = &tmp_stats; + Slice body; PageFooterPB footer; RETURN_IF_ERROR(PageIO::read_and_decompress_page_with_file_cache_retry( From 12d93a9bdaf2ffcdf616f0ca75b8b698e7d5b717 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Fri, 7 Mar 2025 02:49:08 +0800 Subject: [PATCH 4/7] format Signed-off-by: zhengyu --- be/src/cloud/injection_point_action.cpp | 2 +- be/src/olap/rowset/segment_v2/indexed_column_reader.cpp | 4 ++-- be/src/olap/rowset/segment_v2/ordinal_page_index.cpp | 4 ++-- be/src/olap/rowset/segment_v2/page_io.h | 2 +- be/src/olap/rowset/segment_v2/segment.cpp | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index daaaf0ce1c8c1a..4fce2609d8eae8 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -140,7 +140,7 @@ void register_suites() { auto* sp = SyncPoint::get_instance(); sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", [](auto&& args) { LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj"; - if (auto ctx = std::any_cast(args[0])) { + if (auto ctx = std::any_cast(args[0])) { uint32_t* crc = ctx->crc; segment_v2::PageReadOptions* opts = ctx->opts; auto cached_file_reader = diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index bcda3fbadf5839..d48ea721f1d69c 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -124,7 +124,7 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; PageReadOptions opts(io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}); + .file_cache_stats = &stats_ptr->file_cache_stats}); opts.use_page_cache = _use_page_cache; opts.kept_in_memory = _kept_in_memory; opts.pre_decode = pre_decode; @@ -134,7 +134,7 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, opts.codec = codec; opts.stats = stats_ptr; opts.encoding_info = _encoding_info; - + if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; } diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index e66c4b2604cd38..4e1dc7130305cf 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -92,7 +92,7 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; PageReadOptions opts(io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}); + .file_cache_stats = &stats_ptr->file_cache_stats}); opts.use_page_cache = use_page_cache; opts.kept_in_memory = kept_in_memory; opts.type = INDEX_PAGE; @@ -101,7 +101,7 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, // ordinal index page uses NO_COMPRESSION right now opts.codec = nullptr; opts.stats = stats_ptr; - + // read index page PageHandle page_handle; Slice body; diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 2c6574b8503844..65f0bba3410f05 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -76,7 +76,7 @@ struct PageReadOptions { CHECK_NOTNULL(file_reader); CHECK_NOTNULL(stats); } - PageReadOptions(const io::IOContext& ioctx):io_ctx(ioctx) {} + PageReadOptions(const io::IOContext& ioctx) : io_ctx(ioctx) {} PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) { file_reader = old.file_reader; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 0a9f65adbb8363..bc9838d98544ab 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -512,7 +512,7 @@ Status Segment::load_index(OlapReaderStatistics* stats) { OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; PageReadOptions opts(io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}); + .file_cache_stats = &stats_ptr->file_cache_stats}); opts.use_page_cache = true; opts.type = INDEX_PAGE; opts.file_reader = _file_reader.get(); From 763aacaa3bb6058ef5b83f701b5e77b10206e835 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Wed, 12 Mar 2025 17:54:01 +0800 Subject: [PATCH 5/7] fix cloud_p0 Signed-off-by: zhengyu --- be/src/olap/rowset/segment_v2/page_io.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 65f0bba3410f05..274d9b4b35c9d0 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -70,7 +70,7 @@ struct PageReadOptions { const EncodingInfo* encoding_info = nullptr; - const io::IOContext& io_ctx; + const io::IOContext io_ctx; void sanity_check() const { CHECK_NOTNULL(file_reader); From b03a95580bcf3e237cc68bd31a1dfaf31bd6717c Mon Sep 17 00:00:00 2001 From: zhengyu Date: Wed, 26 Mar 2025 22:47:47 +0800 Subject: [PATCH 6/7] response to the reviewer Signed-off-by: zhengyu --- be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +- .../rowset/segment_v2/indexed_column_reader.cpp | 2 +- .../olap/rowset/segment_v2/ordinal_page_index.cpp | 3 +-- be/src/olap/rowset/segment_v2/page_io.cpp | 6 +++--- be/src/olap/rowset/segment_v2/page_io.h | 14 ++++++-------- be/src/olap/rowset/segment_v2/segment.cpp | 4 ++-- 6 files changed, 14 insertions(+), 17 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index ffe4d31f2825f4..06a3d47144d338 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -368,7 +368,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag // index page should not pre decode if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false; - return PageIO::read_and_decompress_page_with_file_cache_retry(opts, handle, page_body, footer); + return PageIO::read_and_decompress_page(opts, handle, page_body, footer); } Status ColumnReader::get_row_ranges_by_zone_map( diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index d48ea721f1d69c..154c5073cfc3b0 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -138,7 +138,7 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; } - auto st = PageIO::read_and_decompress_page_with_file_cache_retry(opts, handle, body, footer); + auto st = PageIO::read_and_decompress_page(opts, handle, body, footer); g_index_reader_compressed_bytes << pp.size; g_index_reader_bytes << footer->uncompressed_size(); g_index_reader_pages << 1; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 4e1dc7130305cf..8faab35cebb342 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -106,8 +106,7 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, PageHandle page_handle; Slice body; PageFooterPB footer; - RETURN_IF_ERROR(PageIO::read_and_decompress_page_with_file_cache_retry(opts, &page_handle, - &body, &footer)); + RETURN_IF_ERROR(PageIO::read_and_decompress_page(opts, &page_handle, &body, &footer)); // parse and save all (ordinal, pp) from index page IndexPageReader reader; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 4a3dbec93454c7..5d21a989427f5d 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -250,7 +250,7 @@ Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOpti PageHandle* handle, Slice* body, PageFooterPB* footer) { // First try to read with file cache - Status st = read_and_decompress_page(opts, handle, body, footer); + Status st = do_read_and_decompress_page(opts, handle, body, footer); if (!st.is() || !config::is_cloud_mode()) { return st; } @@ -275,7 +275,7 @@ Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOpti } // Retry with file cache - st = read_and_decompress_page(opts, handle, body, footer); + st = do_read_and_decompress_page(opts, handle, body, footer); if (!st.is()) { return st; } @@ -287,7 +287,7 @@ Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOpti PageReadOptions new_opts = opts; new_opts.file_reader = cached_file_reader->get_remote_reader(); - st = read_and_decompress_page(new_opts, handle, body, footer); + st = do_read_and_decompress_page(new_opts, handle, body, footer); if (!st.ok()) { LOG(WARNING) << "Corruption again with retry read directly from remote," << " error msg: " << st.msg() diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index 274d9b4b35c9d0..b3c9a1ca79812e 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -147,18 +147,16 @@ class PageIO { // `body' points to page body, // `footer' stores the page footer. // This method is exception safe, it will failed when allocate memory failed. + // deal with CORRUPTION when using file cache, retry from remote static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, - Slice* body, PageFooterPB* footer) { + Slice* body, PageFooterPB* footer); + +private: + static Status do_read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer) { RETURN_IF_CATCH_EXCEPTION( { return read_and_decompress_page_(opts, handle, body, footer); }); } - - // deal with CORRUPTION when using file cache, retry from remote - static Status read_and_decompress_page_with_file_cache_retry(const PageReadOptions& opts, - PageHandle* handle, Slice* body, - PageFooterPB* footer); - -private: // An internal method that not deal with exception. static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, Slice* body, PageFooterPB* footer); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index bc9838d98544ab..43e4a116c349d8 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -523,8 +523,8 @@ Status Segment::load_index(OlapReaderStatistics* stats) { Slice body; PageFooterPB footer; - RETURN_IF_ERROR(PageIO::read_and_decompress_page_with_file_cache_retry( - opts, &_sk_index_handle, &body, &footer)); + RETURN_IF_ERROR( + PageIO::read_and_decompress_page(opts, &_sk_index_handle, &body, &footer)); DCHECK_EQ(footer.type(), SHORT_KEY_PAGE); DCHECK(footer.has_short_key_page_footer()); From 8b05b4bf03f1b0490dadac2f40ade00f0b883a62 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Thu, 27 Mar 2025 10:20:34 +0800 Subject: [PATCH 7/7] minor fix Signed-off-by: zhengyu --- be/src/olap/rowset/segment_v2/page_io.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index 5d21a989427f5d..004ebfbc84a683 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -246,9 +246,8 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle return Status::OK(); } -Status PageIO::read_and_decompress_page_with_file_cache_retry(const PageReadOptions& opts, - PageHandle* handle, Slice* body, - PageFooterPB* footer) { +Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer) { // First try to read with file cache Status st = do_read_and_decompress_page(opts, handle, body, footer); if (!st.is() || !config::is_cloud_mode()) {