Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,15 @@ Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
int64_t seg_id = seg_id_begin;
while (seg_id < seg_id_end) {
std::shared_ptr<segment_v2::Segment> segment;
RETURN_IF_ERROR(load_segment(seg_id, &segment));
RETURN_IF_ERROR(load_segment(seg_id, nullptr, &segment));
segments->push_back(std::move(segment));
seg_id++;
}
return Status::OK();
}

Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) {
Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats,
segment_v2::SegmentSharedPtr* segment) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
Expand All @@ -196,7 +197,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se

auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, rowset_id(),
_schema, reader_options, segment,
_rowset_meta->inverted_index_file_info(seg_id));
_rowset_meta->inverted_index_file_info(seg_id), stats);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id()
<< " : " << s.to_string();
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <vector>

#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
Expand Down Expand Up @@ -76,7 +77,8 @@ class BetaRowset final : public Rowset {
Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
std::vector<segment_v2::SegmentSharedPtr>* segments);

Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
Status load_segment(int64_t seg_id, OlapReaderStatistics* read_stats,
segment_v2::SegmentSharedPtr* segment);

Status get_segments_size(std::vector<size_t>* segments_size);

Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ LazyInitSegmentIterator::LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int
_schema(std::move(schema)),
_read_options(opts) {}

/// Here do not use the argument of `opts`,
/// see where the iterator is created in `BetaRowsetReader::get_segment_iterators`
Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) {
/// See where the iterator is created in `BetaRowsetReader::get_segment_iterators`
Status LazyInitSegmentIterator::init(const StorageReadOptions& opts) {
_need_lazy_init = false;
if (_inner_iterator) {
return Status::OK();
Expand All @@ -42,12 +41,12 @@ Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) {
{
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
_rowset, _segment_id, &segment_cache_handle, _should_use_cache, false));
_rowset, _segment_id, &segment_cache_handle, _should_use_cache, false, opts.stats));
const auto& tmp_segments = segment_cache_handle.get_segments();
segment = tmp_segments[0];
}
RETURN_IF_ERROR(segment->new_iterator(_schema, _read_options, &_inner_iterator));
return _inner_iterator->init(_read_options);
}

} // namespace doris::segment_v2
} // namespace doris::segment_v2
29 changes: 16 additions & 13 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class InvertedIndexIterator;
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
InvertedIndexFileInfo idx_file_info) {
InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
auto s = _open(fs, path, segment_id, rowset_id, tablet_schema, reader_options, output,
idx_file_info);
idx_file_info, stats);
if (!s.ok()) {
if (!config::is_cloud_mode()) {
auto res = ExecEnv::get_tablet(tablet_id);
Expand All @@ -101,14 +101,14 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tab
Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
InvertedIndexFileInfo idx_file_info) {
InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
std::shared_ptr<Segment> segment(
new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info));
segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
auto st = segment->_open();
auto st = segment->_open(stats);
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
if (st.is<ErrorCode::CORRUPTION>() &&
reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
Expand All @@ -121,7 +121,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s

RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
segment->_file_reader = std::move(file_reader);
st = segment->_open();
st = segment->_open(stats);
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
LOG(WARNING) << "failed to try to read remote source file again with cache support,"
Expand All @@ -134,7 +134,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s
opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
segment->_file_reader = std::move(file_reader);
st = segment->_open();
st = segment->_open(stats);
if (!st.ok()) {
LOG(WARNING) << "failed to try to read remote source file directly,"
<< " file path: " << path
Expand Down Expand Up @@ -177,9 +177,9 @@ void Segment::update_metadata_size() {
_tracked_meta_mem_usage = _meta_mem_usage;
}

Status Segment::_open() {
Status Segment::_open(OlapReaderStatistics* stats) {
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared));
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));

_pk_index_meta.reset(
footer_pb_shared->has_primary_key_index_meta()
Expand Down Expand Up @@ -390,7 +390,8 @@ Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_
return Status::OK(); // already exists
};

Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer) {
Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer,
OlapReaderStatistics* stats) {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
auto file_size = _file_reader->size();
if (file_size < 12) {
Expand All @@ -402,7 +403,8 @@ Status Segment::_parse_footer(std::shared_ptr<SegmentFooterPB>& footer) {
uint8_t fixed_buf[12];
size_t bytes_read = 0;
// TODO(plat1ko): Support session variable `enable_file_cache`
io::IOContext io_ctx {.is_index_data = true};
io::IOContext io_ctx {.is_index_data = true,
.file_cache_stats = stats ? &stats->file_cache_stats : nullptr};
RETURN_IF_ERROR(
_file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
DCHECK_EQ(bytes_read, 12);
Expand Down Expand Up @@ -606,7 +608,7 @@ Status Segment::_create_column_readers_once(OlapReaderStatistics* stats) {
SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
return _create_column_readers_once_call.call([&] {
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared));
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
return _create_column_readers(*footer_pb_shared);
});
}
Expand Down Expand Up @@ -1185,7 +1187,8 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
return Status::OK();
}

Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb) {
Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
OlapReaderStatistics* stats) {
std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
if (footer_pb_shared != nullptr) {
footer_pb = footer_pb_shared;
Expand All @@ -1205,7 +1208,7 @@ Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb)

if (!segment_footer_cache->lookup(cache_key, &cache_handle,
segment_v2::PageTypePB::DATA_PAGE)) {
RETURN_IF_ERROR(_parse_footer(footer_pb_shared));
RETURN_IF_ERROR(_parse_footer(footer_pb_shared, stats));
segment_footer_cache->insert(cache_key, footer_pb_shared, footer_pb_shared->ByteSizeLong(),
&cache_handle, segment_v2::PageTypePB::DATA_PAGE);
} else {
Expand Down
12 changes: 7 additions & 5 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
static Status open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info = {});
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info = {},
OlapReaderStatistics* stats = nullptr);

static io::UInt128Wrapper file_cache_key(std::string_view rowset_id, uint32_t seg_id);
io::UInt128Wrapper file_cache_key() const {
Expand Down Expand Up @@ -225,10 +226,11 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
static Status _open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info);
std::shared_ptr<Segment>* output, InvertedIndexFileInfo idx_file_info,
OlapReaderStatistics* stats);
// open segment file and read the minimum amount of necessary information (footer)
Status _open();
Status _parse_footer(std::shared_ptr<SegmentFooterPB>& footer);
Status _open(OlapReaderStatistics* stats);
Status _parse_footer(std::shared_ptr<SegmentFooterPB>& footer, OlapReaderStatistics* stats);
Status _create_column_readers(const SegmentFooterPB& footer);
Status _load_pk_bloom_filter(OlapReaderStatistics* stats);
ColumnReader* _get_column_reader(const TabletColumn& col);
Expand All @@ -245,7 +247,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

Status _create_column_readers_once(OlapReaderStatistics* stats);

Status _get_segment_footer(std::shared_ptr<SegmentFooterPB>&);
Status _get_segment_footer(std::shared_ptr<SegmentFooterPB>&, OlapReaderStatistics* stats);

StoragePageCache::CacheKey get_segment_footer_cache_key() const;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t se
}
// If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache.
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(segment_id, &segment));
RETURN_IF_ERROR(rowset->load_segment(segment_id, index_load_stats, &segment));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
}
Expand Down
8 changes: 4 additions & 4 deletions be/test/olap/date_bloom_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ TEST_F(DateBloomFilterTest, query_index_test) {
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());

segment_v2::SegmentSharedPtr segment;
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, &segment).ok());
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, nullptr, &segment).ok());
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
auto st = segment->_get_segment_footer(footer_pb_shared);
auto st = segment->_get_segment_footer(footer_pb_shared, nullptr);
EXPECT_TRUE(st.ok());
st = segment->_create_column_readers(*footer_pb_shared);
EXPECT_TRUE(st.ok());
Expand Down Expand Up @@ -230,9 +230,9 @@ TEST_F(DateBloomFilterTest, in_list_predicate_test) {
EXPECT_TRUE(_tablet->add_rowset(rowset).ok());

segment_v2::SegmentSharedPtr segment;
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, &segment).ok());
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, nullptr, &segment).ok());
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
auto st = segment->_get_segment_footer(footer_pb_shared);
auto st = segment->_get_segment_footer(footer_pb_shared, nullptr);
EXPECT_TRUE(st.ok());
st = segment->_create_column_readers(*(footer_pb_shared));
EXPECT_TRUE(st.ok());
Expand Down
8 changes: 4 additions & 4 deletions be/test/olap/segment_footer_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ class SegmentFooterCacheTest : public ::testing::Test {
TEST_F(SegmentFooterCacheTest, TestGetSegmentFooter) {
for (auto segment_ptr : _segments) {
std::shared_ptr<segment_v2::SegmentFooterPB> footer;
Status st = segment_ptr->_get_segment_footer(footer);
Status st = segment_ptr->_get_segment_footer(footer, nullptr);
ASSERT_TRUE(st.ok());
}

for (auto segment_ptr : _segments) {
std::shared_ptr<segment_v2::SegmentFooterPB> footer;
Status st = segment_ptr->_get_segment_footer(footer);
Status st = segment_ptr->_get_segment_footer(footer, nullptr);
ASSERT_TRUE(st.ok());
}
}
Expand All @@ -179,7 +179,7 @@ TEST_F(SegmentFooterCacheTest, TestSemgnetFooterPBPage) {
StoragePageCache cache(16 * 2048, 0, 0, 16);
for (auto segment_ptr : _segments) {
std::shared_ptr<segment_v2::SegmentFooterPB> footer;
Status st = segment_ptr->_get_segment_footer(footer);
Status st = segment_ptr->_get_segment_footer(footer, nullptr);
ASSERT_TRUE(st.ok());
PageCacheHandle cache_handle;
cache.insert(segment_ptr->get_segment_footer_cache_key(), footer, footer->ByteSizeLong(),
Expand All @@ -192,4 +192,4 @@ TEST_F(SegmentFooterCacheTest, TestSemgnetFooterPBPage) {
}
}

} // namespace doris
} // namespace doris
Loading