Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "http/http_channel.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "io/cache/cached_remote_file_reader.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "util/stack_util.h"

namespace doris {
Expand Down Expand Up @@ -133,6 +135,26 @@ void register_suites() {
*arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error");
});
});
// curl "be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure"
suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", [](auto&& args) {
LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj";
if (auto ctx = std::any_cast<segment_v2::InjectionContext*>(args[0])) {
uint32_t* crc = ctx->crc;
segment_v2::PageReadOptions* opts = ctx->opts;
auto cached_file_reader =
dynamic_cast<io::CachedRemoteFileReader*>(opts->file_reader);
if (cached_file_reader == nullptr) {
return; // if not cachedreader, then do nothing
} else {
memset(crc, 0, 32);
}
} else {
std::cerr << "Failed to cast std::any to InjectionContext*" << std::endl;
}
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
auto* sp = SyncPoint::get_instance();
Expand Down
23 changes: 11 additions & 12 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,18 +355,17 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec) const {
iter_opts.sanity_check();
PageReadOptions opts {
.verify_checksum = _opts.verify_checksum,
.use_page_cache = iter_opts.use_page_cache,
.kept_in_memory = _opts.kept_in_memory,
.type = iter_opts.type,
.file_reader = iter_opts.file_reader,
.page_pointer = pp,
.codec = codec,
.stats = iter_opts.stats,
.encoding_info = _encoding_info,
.io_ctx = iter_opts.io_ctx,
};
PageReadOptions opts(iter_opts.io_ctx);
opts.verify_checksum = _opts.verify_checksum;
opts.use_page_cache = iter_opts.use_page_cache;
opts.kept_in_memory = _opts.kept_in_memory;
opts.type = iter_opts.type;
opts.file_reader = iter_opts.file_reader;
opts.page_pointer = pp;
opts.codec = codec;
opts.stats = iter_opts.stats;
opts.encoding_info = _encoding_info;

// index page should not pre decode
if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false;
return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
Expand Down
25 changes: 12 additions & 13 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,18 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle,
OlapReaderStatistics* stats) const {
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
PageReadOptions opts {
.use_page_cache = _use_page_cache,
.kept_in_memory = _kept_in_memory,
.pre_decode = pre_decode,
.type = type,
.file_reader = _file_reader.get(),
.page_pointer = pp,
.codec = codec,
.stats = stats_ptr,
.encoding_info = _encoding_info,
.io_ctx = io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats},
};
PageReadOptions opts(io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats});
opts.use_page_cache = _use_page_cache;
opts.kept_in_memory = _kept_in_memory;
opts.pre_decode = pre_decode;
opts.type = type;
opts.file_reader = _file_reader.get();
opts.page_pointer = pp;
opts.codec = codec;
opts.stats = stats_ptr;
opts.encoding_info = _encoding_info;

if (_is_pk_index) {
opts.type = PRIMARY_KEY_INDEX_PAGE;
}
Expand Down
22 changes: 10 additions & 12 deletions be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,16 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory,
// need to read index page
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
PageReadOptions opts {
.use_page_cache = use_page_cache,
.kept_in_memory = kept_in_memory,
.type = INDEX_PAGE,
.file_reader = _file_reader.get(),
.page_pointer = PagePointer(index_meta->root_page().root_page()),
// ordinal index page uses NO_COMPRESSION right now
.codec = nullptr,
.stats = stats_ptr,
.io_ctx = io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats},
};
PageReadOptions opts(io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats});
opts.use_page_cache = use_page_cache;
opts.kept_in_memory = kept_in_memory;
opts.type = INDEX_PAGE;
opts.file_reader = _file_reader.get();
opts.page_pointer = PagePointer(index_meta->root_page().root_page());
// ordinal index page uses NO_COMPRESSION right now
opts.codec = nullptr;
opts.stats = stats_ptr;

// read index page
PageHandle page_handle;
Expand Down
67 changes: 67 additions & 0 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
#include <string>
#include <utility>

#include "cloud/config.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "gutil/strings/substitute.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -111,6 +116,15 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body
return Status::OK();
}

io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
return io::BlockFileCache::hash(base);
}

std::string file_cache_key_str(const std::string& seg_path) {
return file_cache_key_from_path(seg_path).to_string();
}

Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
opts.sanity_check();
Expand Down Expand Up @@ -159,6 +173,9 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle
if (opts.verify_checksum) {
uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4);
InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
(void)ctx;
TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx);
if (expect != actual) {
return Status::Corruption(
"Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect,
Expand Down Expand Up @@ -229,5 +246,55 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle
return Status::OK();
}

Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
// First try to read with file cache
Status st = do_read_and_decompress_page(opts, handle, body, footer);
if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
return st;
}

auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
if (cached_file_reader == nullptr) {
return st;
}

// If we get CORRUPTION error and using file cache, clear cache and retry
LOG(WARNING) << "Bad page may be read from file cache, need retry."
<< " error msg: " << st.msg()
<< " file path: " << opts.file_reader->path().native()
<< " offset: " << opts.page_pointer.offset;

// Remove cache if exists
const std::string path = opts.file_reader->path().string();
auto file_key = file_cache_key_from_path(path);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
if (file_cache) {
file_cache->remove_if_cached(file_key);
}

// Retry with file cache
st = do_read_and_decompress_page(opts, handle, body, footer);
if (!st.is<ErrorCode::CORRUPTION>()) {
return st;
}

LOG(WARNING) << "Corruption again with retry downloading cache,"
<< " error msg: " << st.msg()
<< " file path: " << opts.file_reader->path().native()
<< " offset: " << opts.page_pointer.offset;

PageReadOptions new_opts = opts;
new_opts.file_reader = cached_file_reader->get_remote_reader();
st = do_read_and_decompress_page(new_opts, handle, body, footer);
if (!st.ok()) {
LOG(WARNING) << "Corruption again with retry read directly from remote,"
<< " error msg: " << st.msg()
<< " file path: " << opts.file_reader->path().native()
<< " offset: " << opts.page_pointer.offset << " Give up.";
}
return st;
}

} // namespace segment_v2
} // namespace doris
34 changes: 30 additions & 4 deletions be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache.h"
#include "io/io_common.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "util/slice.h"
Expand All @@ -41,6 +42,9 @@ namespace segment_v2 {
class EncodingInfo;
class PageHandle;

io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path);
std::string file_cache_key_str(const std::string& seg_path);

struct PageReadOptions {
// whether to verify page checksum
bool verify_checksum = true;
Expand All @@ -66,12 +70,31 @@ struct PageReadOptions {

const EncodingInfo* encoding_info = nullptr;

const io::IOContext& io_ctx;
const io::IOContext io_ctx;

void sanity_check() const {
CHECK_NOTNULL(file_reader);
CHECK_NOTNULL(stats);
}
PageReadOptions(const io::IOContext& ioctx) : io_ctx(ioctx) {}

PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) {
file_reader = old.file_reader;
page_pointer = old.page_pointer;
codec = old.codec;
stats = old.stats;
verify_checksum = old.verify_checksum;
use_page_cache = old.use_page_cache;
kept_in_memory = old.kept_in_memory;
type = old.type;
encoding_info = old.encoding_info;
pre_decode = old.pre_decode;
}
};

struct InjectionContext {
uint32_t* crc;
PageReadOptions* opts;
};

inline std::ostream& operator<<(std::ostream& os, const PageReadOptions& opt) {
Expand Down Expand Up @@ -124,13 +147,16 @@ class PageIO {
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory failed.
// deal with CORRUPTION when using file cache, retry from remote
static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
Slice* body, PageFooterPB* footer);

private:
static Status do_read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body, footer); });
}

private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
Expand Down
30 changes: 10 additions & 20 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,6 @@ namespace doris::segment_v2 {

class InvertedIndexIterator;

io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
return io::BlockFileCache::hash(base);
}

std::string file_cache_key_str(const std::string& seg_path) {
return file_cache_key_from_path(seg_path).to_string();
}

Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
Expand Down Expand Up @@ -520,17 +511,16 @@ Status Segment::load_index(OlapReaderStatistics* stats) {
// read and parse short key index page
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
PageReadOptions opts {
.use_page_cache = true,
.type = INDEX_PAGE,
.file_reader = _file_reader.get(),
.page_pointer = PagePointer(_sk_index_page),
// short key index page uses NO_COMPRESSION for now
.codec = nullptr,
.stats = &tmp_stats,
.io_ctx = io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats},
};
PageReadOptions opts(io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats});
opts.use_page_cache = true;
opts.type = INDEX_PAGE;
opts.file_reader = _file_reader.get();
opts.page_pointer = PagePointer(_sk_index_page);
// short key index page uses NO_COMPRESSION for now
opts.codec = nullptr;
opts.stats = &tmp_stats;

Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(
Expand Down
Loading