Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct FileCacheProfile {
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
Expand All @@ -89,15 +88,21 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* get_timer = nullptr;
RuntimeProfile::Counter* set_timer = nullptr;

RuntimeProfile::Counter* inverted_index_num_local_io_total = nullptr;
RuntimeProfile::Counter* inverted_index_num_remote_io_total = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote = nullptr;
RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_io_timer = nullptr;

FileCacheProfileReporter(RuntimeProfile* profile) {
static const char* cache_profile = "FileCache";
ADD_TIMER_WITH_LEVEL(profile, cache_profile, 1);
num_local_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumLocalIOTotal", TUnit::UNIT,
cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1);
write_cache_io_timer =
Expand All @@ -117,13 +122,26 @@ struct FileCacheProfileReporter {
lock_wait_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LockWaitTimer", cache_profile, 1);
get_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "GetTimer", cache_profile, 1);
set_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "SetTimer", cache_profile, 1);

inverted_index_num_local_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexNumLocalIOTotal", TUnit::UNIT, cache_profile, 1);
inverted_index_num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexNumRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
inverted_index_bytes_scanned_from_cache = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexBytesScannedFromCache", TUnit::BYTES, cache_profile, 1);
inverted_index_bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "InvertedIndexBytesScannedFromRemote", TUnit::BYTES, cache_profile, 1);
inverted_index_local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(
profile, "InvertedIndexLocalIOUseTimer", cache_profile, 1);
inverted_index_remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(
profile, "InvertedIndexRemoteIOUseTimer", cache_profile, 1);
inverted_index_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexIOTimer", cache_profile, 1);
}

void update(const FileCacheStatistics* statistics) const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
COUNTER_UPDATE(num_inverted_index_remote_io_total,
statistics->num_inverted_index_remote_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
Expand All @@ -136,6 +154,18 @@ struct FileCacheProfileReporter {
COUNTER_UPDATE(lock_wait_timer, statistics->lock_wait_timer);
COUNTER_UPDATE(get_timer, statistics->get_timer);
COUNTER_UPDATE(set_timer, statistics->set_timer);

COUNTER_UPDATE(inverted_index_num_local_io_total,
statistics->inverted_index_num_local_io_total);
COUNTER_UPDATE(inverted_index_num_remote_io_total,
statistics->inverted_index_num_remote_io_total);
COUNTER_UPDATE(inverted_index_bytes_scanned_from_cache,
statistics->inverted_index_bytes_read_from_local);
COUNTER_UPDATE(inverted_index_bytes_scanned_from_remote,
statistics->inverted_index_bytes_read_from_remote);
COUNTER_UPDATE(inverted_index_local_io_timer, statistics->inverted_index_local_io_timer);
COUNTER_UPDATE(inverted_index_remote_io_timer, statistics->inverted_index_remote_io_timer);
COUNTER_UPDATE(inverted_index_io_timer, statistics->inverted_index_io_timer);
}
};

Expand Down
15 changes: 12 additions & 3 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
if (is_inverted_index) {
statis->num_inverted_index_remote_io_total++;
}
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
Expand All @@ -357,6 +354,18 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
statis->get_timer += read_stats.get_timer;
statis->set_timer += read_stats.set_timer;

if (is_inverted_index) {
if (read_stats.hit_cache) {
statis->inverted_index_num_local_io_total++;
statis->inverted_index_bytes_read_from_local += read_stats.bytes_read;
} else {
statis->inverted_index_num_remote_io_total++;
statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read;
}
statis->inverted_index_local_io_timer += read_stats.local_read_timer;
statis->inverted_index_remote_io_timer += read_stats.remote_read_timer;
}

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ namespace io {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
int64_t num_inverted_index_remote_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
Expand All @@ -51,6 +50,14 @@ struct FileCacheStatistics {
int64_t lock_wait_timer = 0;
int64_t get_timer = 0;
int64_t set_timer = 0;

int64_t inverted_index_num_local_io_total = 0;
int64_t inverted_index_num_remote_io_total = 0;
int64_t inverted_index_bytes_read_from_local = 0;
int64_t inverted_index_bytes_read_from_remote = 0;
int64_t inverted_index_local_io_timer = 0;
int64_t inverted_index_remote_io_timer = 0;
int64_t inverted_index_io_timer = 0;
};

struct IOContext {
Expand Down
57 changes: 57 additions & 0 deletions be/src/olap/inverted_index_profile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>
#include <vector>

#include "olap/inverted_index_stats.h"
#include "util/runtime_profile.h"

namespace doris {

class InvertedIndexProfileReporter {
public:
InvertedIndexProfileReporter() = default;
~InvertedIndexProfileReporter() = default;

void update(RuntimeProfile* profile, const InvertedIndexStatistics* statistics) {
// Determine the iteration limit: the smaller of 20 or the size of statistics->stats
size_t iteration_limit = std::min<size_t>(20, statistics->stats.size());

for (size_t i = 0; i < iteration_limit; ++i) {
const auto& stats = statistics->stats[i];

ADD_TIMER_WITH_LEVEL(profile, hit_rows_name, 1);
auto* hit_rows = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "HitRows_" + stats.column_name,
TUnit::UNIT, hit_rows_name, 1);
COUNTER_UPDATE(hit_rows, stats.hit_rows);

ADD_TIMER_WITH_LEVEL(profile, exec_time_name, 1);
auto* exec_time = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "ExecTime_" + stats.column_name,
TUnit::TIME_NS, exec_time_name, 1);
COUNTER_UPDATE(exec_time, stats.exec_time);
}
}

private:
static constexpr const char* hit_rows_name = "HitRows";
static constexpr const char* exec_time_name = "ExecTime";
};

} // namespace doris
34 changes: 34 additions & 0 deletions be/src/olap/inverted_index_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <vector>

namespace doris {

struct InvertedIndexQueryStatistics {
std::string column_name;
int64_t hit_rows = 0;
int64_t exec_time = 0;
};

struct InvertedIndexStatistics {
std::vector<InvertedIndexQueryStatistics> stats;
};

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "common/config.h"
#include "io/io_common.h"
#include "olap/inverted_index_stats.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset_fwd.h"
#include "util/hash_util.hpp"
Expand Down Expand Up @@ -373,9 +374,12 @@ struct OlapReaderStatistics {
int64_t inverted_index_query_bitmap_copy_timer = 0;
int64_t inverted_index_searcher_open_timer = 0;
int64_t inverted_index_searcher_search_timer = 0;
int64_t inverted_index_searcher_search_init_timer = 0;
int64_t inverted_index_searcher_search_exec_timer = 0;
int64_t inverted_index_searcher_cache_hit = 0;
int64_t inverted_index_searcher_cache_miss = 0;
int64_t inverted_index_downgrade_count = 0;
InvertedIndexStatistics inverted_index_stats;

int64_t output_index_result_column_timer = 0;
// number of segment filtered by column stat when creating seg iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace doris::segment_v2 {
PhraseEdgeQuery::PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}

Expand Down Expand Up @@ -143,7 +144,7 @@ void PhraseEdgeQuery::find_words(const std::function<void(Term*)>& cb) {
Term* term = nullptr;
TermEnum* enumerator = nullptr;
try {
enumerator = _searcher->getReader()->terms();
enumerator = _searcher->getReader()->terms(nullptr, _io_ctx);
while (enumerator->next()) {
term = enumerator->term();
cb(term);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PhraseEdgeQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

std::wstring _field_name;
std::vector<std::string> _terms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "phrase_prefix_query.h"

#include "CLucene/util/stringUtil.h"
#include "olap/rowset//segment_v2/inverted_index/query/prefix_query.h"

namespace doris::segment_v2 {

Expand All @@ -27,7 +26,8 @@ PhrasePrefixQuery::PhrasePrefixQuery(const std::shared_ptr<lucene::search::Index
const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
_max_expansions(query_options.inverted_index_max_expansions),
_prefix_query(io_ctx) {}

void PhrasePrefixQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) {
if (terms.empty()) {
Expand All @@ -42,8 +42,8 @@ void PhrasePrefixQuery::add(const std::wstring& field_name, const std::vector<st
_CLLDECDELETE(t);
} else {
std::vector<CL_NS(index)::Term*> prefix_terms;
PrefixQuery::get_prefix_terms(_searcher->getReader(), field_name, terms[i],
prefix_terms, _max_expansions);
_prefix_query.get_prefix_terms(_searcher->getReader(), field_name, terms[i],
prefix_terms, _max_expansions);
if (prefix_terms.empty()) {
std::wstring ws_term = StringUtil::string_to_wstring(terms[i]);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "CLucene/search/MultiPhraseQuery.h"
// clang-format on

#include "olap/rowset/segment_v2/inverted_index/query/prefix_query.h"

CL_NS_USE(search)

namespace doris::segment_v2 {
Expand All @@ -42,6 +44,7 @@ class PhrasePrefixQuery : public Query {

std::unique_ptr<CL_NS(search)::MultiPhraseQuery> _query;
int32_t _max_expansions = 50;
PrefixQuery _prefix_query;
};

} // namespace doris::segment_v2
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

namespace doris::segment_v2 {

PrefixQuery::PrefixQuery(const io::IOContext* io_ctx) : _io_ctx(io_ctx) {}

void PrefixQuery::get_prefix_terms(IndexReader* reader, const std::wstring& field_name,
const std::string& prefix,
std::vector<CL_NS(index)::Term*>& prefix_terms,
int32_t max_expansions) {
std::wstring ws_prefix = StringUtil::string_to_wstring(prefix);

Term* prefix_term = _CLNEW Term(field_name.c_str(), ws_prefix.c_str());
TermEnum* enumerator = reader->terms(prefix_term);
TermEnum* enumerator = reader->terms(prefix_term, _io_ctx);

int32_t count = 0;
Term* lastTerm = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@

#pragma once

#include <CLucene.h>
#include <CLucene/index/IndexReader.h>

#include <cstdint>
#include "olap/rowset/segment_v2/inverted_index/query/query.h"

CL_NS_USE(index)

namespace doris::segment_v2 {

class PrefixQuery {
public:
PrefixQuery() = default;
PrefixQuery(const io::IOContext* io_ctx);
virtual ~PrefixQuery() = default;

static void get_prefix_terms(IndexReader* reader, const std::wstring& field_name,
const std::string& prefix,
std::vector<CL_NS(index)::Term*>& prefix_terms,
int32_t max_expansions = 50);
void get_prefix_terms(IndexReader* reader, const std::wstring& field_name,
const std::string& prefix, std::vector<CL_NS(index)::Term*>& prefix_terms,
int32_t max_expansions = 50);

private:
const io::IOContext* _io_ctx = nullptr;
};

} // namespace doris::segment_v2
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace doris::segment_v2 {
RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_max_expansions(query_options.inverted_index_max_expansions),
_query(searcher, query_options, io_ctx) {}

Expand Down Expand Up @@ -66,7 +67,7 @@ void RegexpQuery::add(const std::wstring& field_name, const std::vector<std::str
int32_t count = 0;

try {
enumerator = _searcher->getReader()->terms();
enumerator = _searcher->getReader()->terms(nullptr, _io_ctx);
while (enumerator->next()) {
term = enumerator->term();
std::string input = lucene_wcstoutf8string(term->text(), term->textLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class RegexpQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

int32_t _max_expansions = 50;
DisjunctionQuery _query;
Expand Down
Loading
Loading