Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = segment->file_reader().get(),
.stats = stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY,
.file_cache_stats = &stats->file_cache_stats},
};
RETURN_IF_ERROR((*column_iterator)->init(opt));
return Status::OK();
Expand Down Expand Up @@ -441,7 +442,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
const std::vector<RowsetSharedPtr>& specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset, bool with_rowid) {
RowsetSharedPtr* rowset, bool with_rowid,
OlapReaderStatistics* stats) {
SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
size_t seq_col_length = 0;
// use the latest tablet schema to decide if the tablet has sequence column currently
Expand Down Expand Up @@ -489,7 +491,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest

for (auto id : picked_segments) {
Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid,
&loc);
&loc, stats);
if (s.is<KEY_NOT_FOUND>()) {
continue;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ class BaseTablet {
const std::vector<RowsetSharedPtr>& specified_rowsets,
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true);
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true,
OlapReaderStatistics* stats = nullptr);

// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/primary_key_index.h"

#include <butil/time.h>
#include <gen_cpp/segment_v2.pb.h>

#include <utility>
Expand Down Expand Up @@ -95,7 +96,8 @@ Status PrimaryKeyIndexReader::parse_index(io::FileReaderSPtr file_reader,
// parse primary key index
_index_reader.reset(new segment_v2::IndexedColumnReader(file_reader, meta.primary_key_index()));
_index_reader->set_is_pk_index(true);
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false));
RETURN_IF_ERROR(_index_reader->load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));

_index_parsed = true;
return Status::OK();
Expand All @@ -107,7 +109,8 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader,
segment_v2::ColumnIndexMetaPB column_index_meta = meta.bloom_filter_index();
segment_v2::BloomFilterIndexReader bf_index_reader(std::move(file_reader),
column_index_meta.bloom_filter_index());
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false));
RETURN_IF_ERROR(bf_index_reader.load(!config::disable_pk_storage_page_cache, false,
_pk_index_load_stats));
std::unique_ptr<segment_v2::BloomFilterIndexIterator> bf_iter;
RETURN_IF_ERROR(bf_index_reader.new_iterator(&bf_iter));
RETURN_IF_ERROR(bf_iter->read_bloom_filter(0, &_bf));
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/primary_key_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/bloom_filter_index_writer.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
Expand Down Expand Up @@ -97,7 +98,8 @@ class PrimaryKeyIndexBuilder {

class PrimaryKeyIndexReader {
public:
PrimaryKeyIndexReader() : _index_parsed(false), _bf_parsed(false) {}
PrimaryKeyIndexReader(OlapReaderStatistics* pk_index_load_stats = nullptr)
: _index_parsed(false), _bf_parsed(false), _pk_index_load_stats(pk_index_load_stats) {}

~PrimaryKeyIndexReader() {
segment_v2::g_pk_total_bloom_filter_num << -static_cast<int64_t>(_bf_num);
Expand All @@ -111,9 +113,10 @@ class PrimaryKeyIndexReader {

Status parse_bf(io::FileReaderSPtr file_reader, const segment_v2::PrimaryKeyIndexMetaPB& meta);

Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator) const {
Status new_iterator(std::unique_ptr<segment_v2::IndexedColumnIterator>* index_iterator,
OlapReaderStatistics* stats = nullptr) const {
DCHECK(_index_parsed);
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get()));
index_iterator->reset(new segment_v2::IndexedColumnIterator(_index_reader.get(), stats));
return Status::OK();
}

Expand Down Expand Up @@ -152,6 +155,7 @@ class PrimaryKeyIndexReader {
std::unique_ptr<segment_v2::BloomFilter> _bf;
size_t _bf_num = 0;
uint64 _bf_bytes = 0;
OlapReaderStatistics* _pk_index_load_stats = nullptr;
};

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
namespace doris {
namespace segment_v2 {

Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory) {
Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
// TODO yyq: implement a new once flag to avoid status construct.
_index_load_stats = index_load_stats;
return _load_once.call([this, use_page_cache, kept_in_memory] {
return _load(use_page_cache, kept_in_memory);
});
Expand All @@ -42,7 +44,7 @@ Status BloomFilterIndexReader::_load(bool use_page_cache, bool kept_in_memory) {
const IndexedColumnMetaPB& bf_index_meta = _bloom_filter_index_meta->bloom_filter();

_bloom_filter_reader.reset(new IndexedColumnReader(_file_reader, bf_index_meta));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory, _index_load_stats));
return Status::OK();
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class BloomFilterIndexReader {
_bloom_filter_index_meta.reset(new BloomFilterIndexPB(bloom_filter_index_meta));
}

Status load(bool use_page_cache, bool kept_in_memory);
Status load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* _bf_index_load_stats = nullptr);

BloomFilterAlgorithmPB algorithm() { return _bloom_filter_index_meta->algorithm(); }

Expand All @@ -67,6 +68,7 @@ class BloomFilterIndexReader {
const TypeInfo* _type_info = nullptr;
std::unique_ptr<BloomFilterIndexPB> _bloom_filter_index_meta = nullptr;
std::unique_ptr<IndexedColumnReader> _bloom_filter_reader;
OlapReaderStatistics* _index_load_stats = nullptr;
};

class BloomFilterIndexIterator {
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ static bvar::Adder<uint64_t> g_index_reader_memory_bytes("doris_index_reader_mem

using strings::Substitute;

Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats) {
_use_page_cache = use_page_cache;
_kept_in_memory = kept_in_memory;
_index_load_stats = index_load_stats;

_type_info = get_scalar_type_info((FieldType)_meta.data_type());
if (_type_info == nullptr) {
Expand Down Expand Up @@ -105,16 +107,18 @@ Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle*
BlockCompressionCodec* local_compress_codec;
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec));
RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
local_compress_codec, false));
local_compress_codec, false, _index_load_stats));
RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
_mem_size += body.get_size();
return Status::OK();
}

Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body,
PageFooterPB* footer, PageTypePB type,
BlockCompressionCodec* codec, bool pre_decode) const {
BlockCompressionCodec* codec, bool pre_decode,
OlapReaderStatistics* stats) const {
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
PageReadOptions opts {
.use_page_cache = _use_page_cache,
.kept_in_memory = _kept_in_memory,
Expand All @@ -123,9 +127,10 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle,
.file_reader = _file_reader.get(),
.page_pointer = pp,
.codec = codec,
.stats = &tmp_stats,
.stats = stats_ptr,
.encoding_info = _encoding_info,
.io_ctx = io::IOContext {.is_index_data = true},
.io_ctx = io::IOContext {.is_index_data = true,
.file_cache_stats = &stats_ptr->file_cache_stats},
};
if (_is_pk_index) {
opts.type = PRIMARY_KEY_INDEX_PAGE;
Expand Down Expand Up @@ -154,8 +159,8 @@ Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
PageHandle handle;
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(
_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec, true));
RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec,
true, _stats));
// parse data page
// note that page_index is not used in IndexedColumnIterator, so we pass 0
PageDecoderOptions opts;
Expand Down
15 changes: 11 additions & 4 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/index_page.h"
#include "olap/rowset/segment_v2/page_handle.h"
Expand All @@ -53,11 +54,13 @@ class IndexedColumnReader {

~IndexedColumnReader();

Status load(bool use_page_cache, bool kept_in_memory);
Status load(bool use_page_cache, bool kept_in_memory,
OlapReaderStatistics* index_load_stats = nullptr);

// read a page specified by `pp' from `file' into `handle'
Status read_page(const PagePointer& pp, PageHandle* handle, Slice* body, PageFooterPB* footer,
PageTypePB type, BlockCompressionCodec* codec, bool pre_decode) const;
PageTypePB type, BlockCompressionCodec* codec, bool pre_decode,
OlapReaderStatistics* stats = nullptr) const;

int64_t num_values() const { return _num_values; }
const EncodingInfo* encoding_info() const { return _encoding_info; }
Expand Down Expand Up @@ -95,14 +98,17 @@ class IndexedColumnReader {
const KeyCoder* _value_key_coder = nullptr;
uint64_t _mem_size = 0;
bool _is_pk_index = false;
OlapReaderStatistics* _index_load_stats = nullptr;
};

class IndexedColumnIterator {
public:
explicit IndexedColumnIterator(const IndexedColumnReader* reader)
explicit IndexedColumnIterator(const IndexedColumnReader* reader,
OlapReaderStatistics* stats = nullptr)
: _reader(reader),
_ordinal_iter(&reader->_ordinal_index_reader),
_value_iter(&reader->_value_index_reader) {}
_value_iter(&reader->_value_index_reader),
_stats(stats) {}

// Seek to the given ordinal entry. Entry 0 is the first entry.
// Return Status::Error<ENTRY_NOT_FOUND> if provided seek point is past the end.
Expand Down Expand Up @@ -151,6 +157,7 @@ class IndexedColumnIterator {
ordinal_t _current_ordinal = 0;
// iterator owned compress codec, should NOT be shared by threads, initialized before used
BlockCompressionCodec* _compress_codec = nullptr;
OlapReaderStatistics* _stats = nullptr;
};

} // namespace segment_v2
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ Status Segment::_load_pk_bloom_filter() {
});
}

Status Segment::load_pk_index_and_bf() {
Status Segment::load_pk_index_and_bf(OlapReaderStatistics* index_load_stats) {
_pk_index_load_stats = index_load_stats;
RETURN_IF_ERROR(load_index());
RETURN_IF_ERROR(_load_pk_bloom_filter());
return Status::OK();
Expand All @@ -467,7 +468,7 @@ Status Segment::load_pk_index_and_bf() {
Status Segment::load_index() {
return _load_index_once.call([this] {
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) {
_pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
_pk_index_reader = std::make_unique<PrimaryKeyIndexReader>(_pk_index_load_stats);
RETURN_IF_ERROR(_pk_index_reader->parse_index(_file_reader, *_pk_index_meta));
// _meta_mem_usage += _pk_index_reader->get_memory_size();
return Status::OK();
Expand Down Expand Up @@ -926,7 +927,8 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column,
}

Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_schema,
bool with_seq_col, bool with_rowid, RowLocation* row_location) {
bool with_seq_col, bool with_rowid, RowLocation* row_location,
OlapReaderStatistics* stats) {
RETURN_IF_ERROR(load_pk_index_and_bf());
bool has_seq_col = latest_schema->has_sequence_col();
bool has_rowid = !latest_schema->cluster_key_idxes().empty();
Expand All @@ -946,7 +948,7 @@ Status Segment::lookup_row_key(const Slice& key, const TabletSchema* latest_sche
}
bool exact_match = false;
std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator));
RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator, stats));
auto st = index_iterator->seek_at_or_after(&key_without_seq, &exact_match);
if (!st.ok() && !st.is<ErrorCode::ENTRY_NOT_FOUND>()) {
return st;
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
}

Status lookup_row_key(const Slice& key, const TabletSchema* latest_schema, bool with_seq_col,
bool with_rowid, RowLocation* row_location);
bool with_rowid, RowLocation* row_location,
OlapReaderStatistics* stats = nullptr);

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Expand All @@ -140,7 +141,7 @@ class Segment : public std::enable_shared_from_this<Segment> {

Status load_index();

Status load_pk_index_and_bf();
Status load_pk_index_and_bf(OlapReaderStatistics* index_load_stats = nullptr);

void update_healthy_status(Status new_status) { _healthy_status.update(new_status); }
// The segment is loaded into SegmentCache and then will load indices, if there are something wrong
Expand Down Expand Up @@ -294,6 +295,7 @@ class Segment : public std::enable_shared_from_this<Segment> {
InvertedIndexFileInfo _idx_file_info;

int _be_exec_version = BeExecVersionManager::get_newest_version();
OlapReaderStatistics* _pk_index_load_stats = nullptr;
};

} // namespace segment_v2
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/segment_loader.h"

#include <butil/time.h>

#include "common/config.h"
#include "common/status.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -52,7 +54,8 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf) {
bool need_load_pk_index_and_bf,
OlapReaderStatistics* index_load_stats) {
if (cache_handle->is_inited()) {
return Status::OK();
}
Expand All @@ -70,7 +73,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf());
RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
}
if (use_cache && !config::disable_segment_cache) {
// memory of SegmentCache::CacheValue will be handled by SegmentCache
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class SegmentLoader {
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
bool use_cache = false, bool need_load_pk_index_and_bf = false);
bool use_cache = false, bool need_load_pk_index_and_bf = false,
OlapReaderStatistics* index_load_stats = nullptr);

void erase_segment(const SegmentCache::CacheKey& key);

Expand Down
11 changes: 4 additions & 7 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,13 +886,10 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController

Status PInternalService::_tablet_fetch_data(const PTabletKeyLookupRequest* request,
PTabletKeyLookupResponse* response) {
PointQueryExecutor lookup_util;
RETURN_IF_ERROR(lookup_util.init(request, response));
RETURN_IF_ERROR(lookup_util.lookup_up());
if (VLOG_DEBUG_IS_ON) {
VLOG_DEBUG << lookup_util.print_profile();
}
LOG_EVERY_N(INFO, 500) << lookup_util.print_profile();
PointQueryExecutor executor;
RETURN_IF_ERROR(executor.init(request, response));
RETURN_IF_ERROR(executor.lookup_up());
executor.print_profile();
return Status::OK();
}

Expand Down
Loading