Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct FileCacheProfile {
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
Expand All @@ -89,15 +88,20 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* get_timer = nullptr;
RuntimeProfile::Counter* set_timer = nullptr;

RuntimeProfile::Counter* inverted_index_num_local_io_total = nullptr;
RuntimeProfile::Counter* inverted_index_num_remote_io_total = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote = nullptr;
RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr;

FileCacheProfileReporter(RuntimeProfile* profile) {
static const char* cache_profile = "FileCache";
ADD_TIMER_WITH_LEVEL(profile, cache_profile, 1);
num_local_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumLocalIOTotal", TUnit::UNIT,
cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1);
write_cache_io_timer =
Expand All @@ -117,13 +121,24 @@ struct FileCacheProfileReporter {
lock_wait_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LockWaitTimer", cache_profile, 1);
get_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "GetTimer", cache_profile, 1);
set_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "SetTimer", cache_profile, 1);

inverted_index_num_local_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexNumLocalIOTotal", TUnit::UNIT, cache_profile, 1);
inverted_index_num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexNumRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
inverted_index_bytes_scanned_from_cache = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexBytesScannedFromCache", TUnit::BYTES, cache_profile, 1);
inverted_index_bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexBytesScannedFromRemote", TUnit::BYTES, cache_profile, 1);
inverted_index_local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(
profile, "InvertedIndexLocalIOUseTimer", cache_profile, 1);
inverted_index_remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(
profile, "InvertedIndexRemoteIOUseTimer", cache_profile, 1);
}

void update(const FileCacheStatistics* statistics) const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
COUNTER_UPDATE(num_inverted_index_remote_io_total,
statistics->num_inverted_index_remote_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
Expand All @@ -136,6 +151,17 @@ struct FileCacheProfileReporter {
COUNTER_UPDATE(lock_wait_timer, statistics->lock_wait_timer);
COUNTER_UPDATE(get_timer, statistics->get_timer);
COUNTER_UPDATE(set_timer, statistics->set_timer);

COUNTER_UPDATE(inverted_index_num_local_io_total,
statistics->inverted_index_num_local_io_total);
COUNTER_UPDATE(inverted_index_num_remote_io_total,
statistics->inverted_index_num_remote_io_total);
COUNTER_UPDATE(inverted_index_bytes_scanned_from_cache,
statistics->inverted_index_bytes_read_from_local);
COUNTER_UPDATE(inverted_index_bytes_scanned_from_remote,
statistics->inverted_index_bytes_read_from_remote);
COUNTER_UPDATE(inverted_index_local_io_timer, statistics->inverted_index_local_io_timer);
COUNTER_UPDATE(inverted_index_remote_io_timer, statistics->inverted_index_remote_io_timer);
}
};

Expand Down
15 changes: 12 additions & 3 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
if (is_inverted_index) {
statis->num_inverted_index_remote_io_total++;
}
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
Expand All @@ -357,6 +354,18 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
statis->get_timer += read_stats.get_timer;
statis->set_timer += read_stats.set_timer;

if (is_inverted_index) {
if (read_stats.hit_cache) {
statis->inverted_index_num_local_io_total++;
statis->inverted_index_bytes_read_from_local += read_stats.bytes_read;
} else {
statis->inverted_index_num_remote_io_total++;
statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read;
}
statis->inverted_index_local_io_timer += read_stats.local_read_timer;
statis->inverted_index_remote_io_timer += read_stats.remote_read_timer;
}

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
}
Expand Down
8 changes: 7 additions & 1 deletion be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ namespace io {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
int64_t num_inverted_index_remote_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
Expand All @@ -51,6 +50,13 @@ struct FileCacheStatistics {
int64_t lock_wait_timer = 0;
int64_t get_timer = 0;
int64_t set_timer = 0;

int64_t inverted_index_num_local_io_total = 0;
int64_t inverted_index_num_remote_io_total = 0;
int64_t inverted_index_bytes_read_from_local = 0;
int64_t inverted_index_bytes_read_from_remote = 0;
int64_t inverted_index_local_io_timer = 0;
int64_t inverted_index_remote_io_timer = 0;
};

struct IOContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ ConjunctionQuery::ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSe
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_index_version(_searcher->getReader()->getIndexVersion()),
_conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold) {}
_conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold),
_io_ctx(io_ctx) {}

void ConjunctionQuery::add(const InvertedIndexQueryInfo& query_info) {
if (query_info.terms.empty()) {
Expand All @@ -32,8 +33,8 @@ void ConjunctionQuery::add(const InvertedIndexQueryInfo& query_info) {

std::vector<TermIterator> iterators;
for (const auto& term : query_info.terms) {
auto* term_doc =
TermIterator::ensure_term_doc(_searcher->getReader(), query_info.field_name, term);
auto* term_doc = TermIterator::ensure_term_doc(_io_ctx, _searcher->getReader(),
query_info.field_name, term);
iterators.emplace_back(term_doc);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ConjunctionQuery : public Query {

IndexVersion _index_version = IndexVersion::kV0;
int32_t _conjunction_ratio = 1000;
const io::IOContext* _io_ctx = nullptr;
bool _use_skip = false;

TermIterator _lead1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace doris::segment_v2 {

DisjunctionQuery::DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher) {}
: _searcher(searcher), _io_ctx(io_ctx) {}

void DisjunctionQuery::add(const InvertedIndexQueryInfo& query_info) {
if (query_info.terms.empty()) {
Expand All @@ -34,7 +34,8 @@ void DisjunctionQuery::add(const InvertedIndexQueryInfo& query_info) {

void DisjunctionQuery::search(roaring::Roaring& roaring) {
auto func = [this, &roaring](const std::string& term, bool first) {
auto* term_doc = TermIterator::ensure_term_doc(_searcher->getReader(), _field_name, term);
auto* term_doc =
TermIterator::ensure_term_doc(_io_ctx, _searcher->getReader(), _field_name, term);
TermIterator iterator(term_doc);

DocRange doc_range;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class DisjunctionQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

std::wstring _field_name;
std::vector<std::string> _terms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace doris::segment_v2 {
PhraseEdgeQuery::PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}

Expand Down Expand Up @@ -143,7 +144,7 @@ void PhraseEdgeQuery::find_words(const std::function<void(Term*)>& cb) {
Term* term = nullptr;
TermEnum* enumerator = nullptr;
try {
enumerator = _searcher->getReader()->terms();
enumerator = _searcher->getReader()->terms(nullptr, _io_ctx);
while (enumerator->next()) {
term = enumerator->term();
cb(term);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PhraseEdgeQuery : public Query {
void find_words(const std::function<void(Term*)>& cb);

std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

std::wstring _field_name;
std::vector<std::string> _terms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ void PhrasePrefixQuery::add(const InvertedIndexQueryInfo& query_info) {
std::wstring ws = StringUtil::string_to_wstring(query_info.terms[i]);
terms[i].emplace_back(ws);
} else {
PrefixQuery::get_prefix_terms(_searcher->getReader(), query_info.field_name,
query_info.terms[i], terms[i], _max_expansions);
_prefix_query.get_prefix_terms(_searcher->getReader(), query_info.field_name,
query_info.terms[i], terms[i], _max_expansions);
if (terms[i].empty()) {
std::wstring ws = StringUtil::string_to_wstring(query_info.terms[i]);
terms[i].emplace_back(ws);
Expand Down
20 changes: 10 additions & 10 deletions be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace doris::segment_v2 {

PhraseQuery::PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher) {}
: _searcher(searcher), _io_ctx(io_ctx) {}

void PhraseQuery::add(const InvertedIndexQueryInfo& query_info) {
if (query_info.terms.empty()) {
Expand All @@ -38,7 +38,7 @@ void PhraseQuery::add(const InvertedIndexQueryInfo& query_info) {

if (query_info.terms.size() == 1) {
auto* term_pos = TermPositionIterator::ensure_term_position(
_searcher->getReader(), query_info.field_name, query_info.terms[0]);
_io_ctx, _searcher->getReader(), query_info.field_name, query_info.terms[0]);
_iterators.emplace_back(std::make_shared<TermPositionIterator>(term_pos));
_lead1 = &_iterators.at(0);
return;
Expand Down Expand Up @@ -90,7 +90,7 @@ void PhraseQuery::init_exact_phrase_matcher(const InvertedIndexQueryInfo& query_
std::vector<PostingsAndPosition> postings;
for (size_t i = 0; i < query_info.terms.size(); i++) {
const auto& term = query_info.terms[i];
auto* term_pos = TermPositionIterator::ensure_term_position(_searcher->getReader(),
auto* term_pos = TermPositionIterator::ensure_term_position(_io_ctx, _searcher->getReader(),
query_info.field_name, term);
auto iter = std::make_shared<TermPositionIterator>(term_pos);
_iterators.emplace_back(iter);
Expand All @@ -106,16 +106,16 @@ void PhraseQuery::init_exact_phrase_matcher(const std::wstring& field_name,
for (size_t i = 0; i < terms.size(); i++) {
if (i < terms.size() - 1) {
const auto& term = terms[i][0];
auto* term_pos = TermPositionIterator::ensure_term_position(_searcher->getReader(),
field_name, term);
auto* term_pos = TermPositionIterator::ensure_term_position(
_io_ctx, _searcher->getReader(), field_name, term);
auto iter = std::make_shared<TermPositionIterator>(term_pos);
_iterators.emplace_back(iter);
postings.emplace_back(iter, i);
} else {
std::vector<TermPositionIterator> subs;
for (const auto& term : terms[i]) {
auto* term_pos = TermPositionIterator::ensure_term_position(_searcher->getReader(),
field_name, term);
auto* term_pos = TermPositionIterator::ensure_term_position(
_io_ctx, _searcher->getReader(), field_name, term);
subs.emplace_back(term_pos);
}
auto iter = std::make_shared<UnionTermIterator<TermPositionIterator>>(std::move(subs));
Expand All @@ -131,7 +131,7 @@ void PhraseQuery::init_sloppy_phrase_matcher(const InvertedIndexQueryInfo& query
std::vector<PostingsAndFreq> postings;
for (size_t i = 0; i < query_info.terms.size(); i++) {
const auto& term = query_info.terms[i];
auto* term_pos = TermPositionIterator::ensure_term_position(_searcher->getReader(),
auto* term_pos = TermPositionIterator::ensure_term_position(_io_ctx, _searcher->getReader(),
query_info.field_name, term);
auto iter = std::make_shared<TermPositionIterator>(term_pos);
_iterators.emplace_back(iter);
Expand All @@ -147,7 +147,7 @@ void PhraseQuery::init_ordered_sloppy_phrase_matcher(const InvertedIndexQueryInf
for (size_t i = 0; i < query_info.terms.size(); i++) {
const auto& term = query_info.terms[i];
auto* term_pos = TermPositionIterator::ensure_term_position(
_searcher->getReader(), query_info.field_name, term);
_io_ctx, _searcher->getReader(), query_info.field_name, term);
auto iter = std::make_shared<TermPositionIterator>(term_pos);
_iterators.emplace_back(iter);
postings.emplace_back(iter, i);
Expand All @@ -161,7 +161,7 @@ void PhraseQuery::init_ordered_sloppy_phrase_matcher(const InvertedIndexQueryInf
for (size_t i = 0; i < terms.size(); i++) {
const auto& term = terms[i];
auto* term_pos = TermPositionIterator::ensure_term_position(
_searcher->getReader(), query_info.field_name, term);
_io_ctx, _searcher->getReader(), query_info.field_name, term);
auto iter = std::make_shared<TermPositionIterator>(term_pos);
postings.emplace_back(iter, i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PhraseQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

DISI* _lead1 = nullptr;
DISI* _lead2 = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace doris::segment_v2 {

PrefixQuery::PrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher) {}
: _searcher(searcher), _io_ctx(io_ctx) {}

void PrefixQuery::add(const std::wstring& field_name, const std::vector<std::wstring>& terms) {
if (terms.empty()) {
Expand All @@ -30,7 +30,7 @@ void PrefixQuery::add(const std::wstring& field_name, const std::vector<std::wst

std::vector<TermPositionIterator> subs;
for (const auto& ws_term : terms) {
auto* term_doc = TermPositionIterator::ensure_term_position(_searcher->getReader(),
auto* term_doc = TermPositionIterator::ensure_term_position(_io_ctx, _searcher->getReader(),
field_name, ws_term);
subs.emplace_back(term_doc);
}
Expand All @@ -50,7 +50,7 @@ void PrefixQuery::get_prefix_terms(IndexReader* reader, const std::wstring& fiel
std::wstring ws_prefix = StringUtil::string_to_wstring(prefix);

Term* prefix_term = _CLNEW Term(field_name.c_str(), ws_prefix.c_str());
TermEnum* enumerator = reader->terms(prefix_term);
TermEnum* enumerator = reader->terms(prefix_term, _io_ctx);

int32_t count = 0;
Term* lastTerm = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ class PrefixQuery : public Query {
void add(const std::wstring& field_name, const std::vector<std::wstring>& terms);
void search(roaring::Roaring& roaring) override;

static void get_prefix_terms(IndexReader* reader, const std::wstring& field_name,
const std::string& prefix, std::vector<std::wstring>& prefix_terms,
int32_t max_expansions = 50);
void get_prefix_terms(IndexReader* reader, const std::wstring& field_name,
const std::string& prefix, std::vector<std::wstring>& prefix_terms,
int32_t max_expansions = 50);

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

UnionTermIterPtr _lead1;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace doris::segment_v2 {
RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_max_expansions(query_options.inverted_index_max_expansions),
_query(searcher, query_options, io_ctx) {}

Expand Down Expand Up @@ -129,9 +130,9 @@ void RegexpQuery::collect_matching_terms(const std::wstring& field_name,
if (prefix) {
std::wstring ws_prefix = StringUtil::string_to_wstring(*prefix);
Term prefix(field_name.c_str(), ws_prefix.c_str());
enumerator = _searcher->getReader()->terms(&prefix);
enumerator = _searcher->getReader()->terms(&prefix, _io_ctx);
} else {
enumerator = _searcher->getReader()->terms();
enumerator = _searcher->getReader()->terms(nullptr, _io_ctx);
enumerator->next();
}
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class RegexpQuery : public Query {
const std::optional<std::string>& prefix);

std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

int32_t _max_expansions = 50;
DisjunctionQuery _query;
Expand Down
Loading
Loading