Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
return Status::OK();
}

Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) {
Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
OlapReaderStatistics* stats) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
Expand All @@ -181,7 +182,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se

auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, rowset_id(),
_schema, reader_options, segment,
_rowset_meta->inverted_index_file_info(seg_id));
_rowset_meta->inverted_index_file_info(seg_id), stats);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id()
<< " : " << s.to_string();
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
Expand Down Expand Up @@ -76,7 +77,8 @@ class BetaRowset final : public Rowset {
Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
std::vector<segment_v2::SegmentSharedPtr>* segments);

Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
OlapReaderStatistics* stats = nullptr);

Status get_segments_size(std::vector<size_t>* segments_size);

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
_rowset, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false));
/*need_load_pk_index_and_bf*/ false, _stats));
}

// create iterator for each segment
Expand Down
21 changes: 11 additions & 10 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class InvertedIndexIterator;
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
InvertedIndexFileInfo idx_file_info) {
InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, reader_options, output,
idx_file_info);
idx_file_info, stats);
if (!s.ok()) {
if (!config::is_cloud_mode()) {
auto res = ExecEnv::get_tablet(tablet_id);
Expand All @@ -101,14 +101,14 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tab
Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
InvertedIndexFileInfo idx_file_info) {
InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
std::shared_ptr<Segment> segment(
new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info));
segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
auto st = segment->_open();
auto st = segment->_open(stats);
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
if (st.is<ErrorCode::CORRUPTION>() &&
reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
Expand All @@ -121,7 +121,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s

RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
segment->_file_reader = std::move(file_reader);
st = segment->_open();
st = segment->_open(stats);
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
LOG(WARNING) << "failed to try to read remote source file again with cache support,"
Expand All @@ -134,7 +134,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s
opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
segment->_file_reader = std::move(file_reader);
st = segment->_open();
st = segment->_open(stats);
if (!st.ok()) {
LOG(WARNING) << "failed to try to read remote source file directly,"
<< " file path: " << path
Expand Down Expand Up @@ -176,9 +176,9 @@ void Segment::update_metadata_size() {
_tracked_meta_mem_usage = _meta_mem_usage;
}

Status Segment::_open() {
Status Segment::_open(OlapReaderStatistics* stats) {
_footer_pb = std::make_unique<SegmentFooterPB>();
RETURN_IF_ERROR(_parse_footer(_footer_pb.get()));
RETURN_IF_ERROR(_parse_footer(_footer_pb.get(), stats));
_pk_index_meta.reset(_footer_pb->has_primary_key_index_meta()
? new PrimaryKeyIndexMetaPB(_footer_pb->primary_key_index_meta())
: nullptr);
Expand Down Expand Up @@ -387,7 +387,7 @@ Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_
return Status::OK(); // already exists
};

Status Segment::_parse_footer(SegmentFooterPB* footer) {
Status Segment::_parse_footer(SegmentFooterPB* footer, OlapReaderStatistics* stats) {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
auto file_size = _file_reader->size();
if (file_size < 12) {
Expand All @@ -399,7 +399,8 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
uint8_t fixed_buf[12];
size_t bytes_read = 0;
// TODO(plat1ko): Support session variable `enable_file_cache`
io::IOContext io_ctx {.is_index_data = true};
io::IOContext io_ctx {.is_index_data = true,
.file_cache_stats = stats ? &stats->file_cache_stats : nullptr};
RETURN_IF_ERROR(
_file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
DCHECK_EQ(bytes_read, 12);
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
static Status open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info = {});
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info = {},
OlapReaderStatistics* stats = nullptr);

static io::UInt128Wrapper file_cache_key(std::string_view rowset_id, uint32_t seg_id);
io::UInt128Wrapper file_cache_key() const {
Expand Down Expand Up @@ -220,10 +221,11 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
static Status _open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info);
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info,
OlapReaderStatistics* stats = nullptr);
// open segment file and read the minimum amount of necessary information (footer)
Status _open();
Status _parse_footer(SegmentFooterPB* footer);
Status _open(OlapReaderStatistics* stats);
Status _parse_footer(SegmentFooterPB* footer, OlapReaderStatistics* stats = nullptr);
Status _create_column_readers(const SegmentFooterPB& footer);
Status _load_pk_bloom_filter(OlapReaderStatistics* stats);
ColumnReader* _get_column_reader(const TabletColumn& col);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
}
// If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache.
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
RETURN_IF_ERROR(rowset->load_segment(i, &segment, index_load_stats));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
}
Expand Down
Loading