Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "exec/schema_scanner/schema_collations_scanner.h"
#include "exec/schema_scanner/schema_columns_scanner.h"
#include "exec/schema_scanner/schema_dummy_scanner.h"
#include "exec/schema_scanner/schema_file_cache_statistics.h"
#include "exec/schema_scanner/schema_files_scanner.h"
#include "exec/schema_scanner/schema_metadata_name_ids_scanner.h"
#include "exec/schema_scanner/schema_partitions_scanner.h"
Expand Down Expand Up @@ -224,6 +225,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaUserScanner::create_unique();
case TSchemaTableType::SCH_WORKLOAD_POLICY:
return SchemaWorkloadSchedulePolicyScanner::create_unique();
case TSchemaTableType::SCH_FILE_CACHE_STATISTICS:
return SchemaFileCacheStatisticsScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
88 changes: 88 additions & 0 deletions be/src/exec/schema_scanner/schema_file_cache_statistics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/schema_scanner/schema_file_cache_statistics.h"

#include "io/cache/block/block_file_cache_factory.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {

std::vector<SchemaScanner::ColumnDesc> SchemaFileCacheStatisticsScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"BE_IP", TYPE_VARCHAR, sizeof(StringRef), false},
{"CACHE_PATH", TYPE_VARCHAR, sizeof(StringRef), false},
{"METRIC_NAME", TYPE_VARCHAR, sizeof(StringRef), false},
{"METRIC_VALUE", TYPE_DOUBLE, sizeof(double), false}};

SchemaFileCacheStatisticsScanner::SchemaFileCacheStatisticsScanner()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_FILE_CACHE_STATISTICS) {}

SchemaFileCacheStatisticsScanner::~SchemaFileCacheStatisticsScanner() {}

Status SchemaFileCacheStatisticsScanner::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
return Status::OK();
}

Status SchemaFileCacheStatisticsScanner::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

if (_stats_block == nullptr) {
_stats_block = vectorized::Block::create_unique();

for (int i = 0; i < _s_tbls_columns.size(); ++i) {
TypeDescriptor descriptor(_s_tbls_columns[i].type);
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
_stats_block->insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), data_type, _s_tbls_columns[i].name));
}

_stats_block->reserve(_block_rows_limit);

ExecEnv::GetInstance()->file_cache_factory()->get_cache_stats_block(_stats_block.get());
_total_rows = _stats_block->rows();
}

if (_row_idx == _total_rows) {
*eos = true;
return Status::OK();
}

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
RETURN_IF_ERROR(mblock.add_rows(_stats_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
return Status::OK();
}

} // namespace doris
49 changes: 49 additions & 0 deletions be/src/exec/schema_scanner/schema_file_cache_statistics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <vector>

#include "common/status.h"
#include "exec/schema_scanner.h"

namespace doris {
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized

class SchemaFileCacheStatisticsScanner : public SchemaScanner {
ENABLE_FACTORY_CREATOR(SchemaFileCacheStatisticsScanner);

public:
SchemaFileCacheStatisticsScanner();
~SchemaFileCacheStatisticsScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

private:
int _block_rows_limit = 4096;
int _row_idx = 0;
int _total_rows = 0;
std::unique_ptr<vectorized::Block> _stats_block = nullptr;
};
}; // namespace doris
1 change: 1 addition & 0 deletions be/src/io/cache/block/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class IFileCache {

virtual void change_cache_type(const Key& key, size_t offset, CacheType new_type,
std::lock_guard<std::mutex>& cache_lock) = 0;
virtual std::map<std::string, double> get_stats() = 0;

static std::string_view cache_type_to_string(CacheType type);
static CacheType string_to_cache_type(const std::string& str);
Expand Down
52 changes: 52 additions & 0 deletions be/src/io/cache/block/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "io/cache/block/block_lru_file_cache.h"
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "vec/core/block.h"

namespace doris {
class TUniqueId;
Expand Down Expand Up @@ -112,5 +114,55 @@ std::vector<IFileCache::QueryFileCacheContextHolderPtr> FileCacheFactory::get_qu
return holders;
}

void FileCacheFactory::get_cache_stats_block(vectorized::Block* block) {
// std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
TBackend be = BackendOptions::get_local_backend();
int64_t be_id = be.id;
std::string be_ip = be.host;

auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
int_val);
nullable_column->get_null_map_data().emplace_back(0);
};

auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
double_val);
nullable_column->get_null_map_data().emplace_back(0);
};

auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
str_val.size());
nullable_column->get_null_map_data().emplace_back(0);
};

for (auto& cache : _caches) {
std::map<std::string, double> stats = cache->get_stats();
for (auto& [k, v] : stats) {
insert_int_value(0, be_id, block); // be id
insert_string_value(1, be_ip, block); // be ip
insert_string_value(2, cache->get_base_path(), block); // cache path
insert_string_value(3, k, block); // metric name
insert_double_value(4, v, block); // metric value
}
}
}
} // namespace io
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/io/cache/block/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
namespace doris {
class TUniqueId;

namespace vectorized {
class Block;
} // namespace vectorized

namespace io {

/**
Expand All @@ -54,6 +58,8 @@ class FileCacheFactory {
FileCacheFactory& operator=(const FileCacheFactory&) = delete;
FileCacheFactory(const FileCacheFactory&) = delete;

void get_cache_stats_block(vectorized::Block* block);

private:
// to protect following containers
std::mutex _cache_mutex;
Expand Down
69 changes: 65 additions & 4 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace doris {
namespace io {

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_hits_ratio, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_hits_ratio_5m, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_hits_ratio_1h, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_removed_elements, MetricUnit::OPERATIONS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_max_size, MetricUnit::BYTES);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_index_queue_curr_size, MetricUnit::BYTES);
Expand All @@ -74,6 +76,17 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_max_elements, Met
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_disposable_queue_curr_elements, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(file_cache_segment_reader_cache_size, MetricUnit::NOUNIT);

bvar::Adder<int64_t> g_file_cache_num_read_segments("doris_file_cache_num_read_segments");
bvar::Adder<int64_t> g_file_cache_num_hit_segments("doris_file_cache_num_hit_segments");
bvar::Window<bvar::Adder<int64_t>> g_file_cache_num_hit_segments_5m(&g_file_cache_num_hit_segments,
300);
bvar::Window<bvar::Adder<int64_t>> g_file_cache_num_read_segments_5m(
&g_file_cache_num_read_segments, 300);
bvar::Window<bvar::Adder<int64_t>> g_file_cache_num_hit_segments_1h(&g_file_cache_num_hit_segments,
3600);
bvar::Window<bvar::Adder<int64_t>> g_file_cache_num_read_segments_1h(
&g_file_cache_num_read_segments, 3600);

LRUFileCache::LRUFileCache(const std::string& cache_base_path,
const FileCacheSettings& cache_settings)
: IFileCache(cache_base_path, cache_settings) {
Expand All @@ -89,6 +102,8 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
_entity->register_hook(_cache_base_path, std::bind(&LRUFileCache::update_cache_metrics, this));

INT_DOUBLE_METRIC_REGISTER(_entity, file_cache_hits_ratio);
INT_DOUBLE_METRIC_REGISTER(_entity, file_cache_hits_ratio_5m);
INT_DOUBLE_METRIC_REGISTER(_entity, file_cache_hits_ratio_1h);
INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_removed_elements);

INT_UGAUGE_METRIC_REGISTER(_entity, file_cache_index_queue_max_size);
Expand Down Expand Up @@ -416,10 +431,10 @@ FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t
}

DCHECK(!file_blocks.empty());
_num_read_segments += file_blocks.size();
g_file_cache_num_read_segments << file_blocks.size();
for (auto& segment : file_blocks) {
if (segment->state() == FileBlock::State::DOWNLOADED) {
_num_hit_segments++;
g_file_cache_num_hit_segments << 1;
}
}
return FileBlocksHolder(std::move(file_blocks));
Expand Down Expand Up @@ -1147,11 +1162,28 @@ void LRUFileCache::run_background_operation() {
void LRUFileCache::update_cache_metrics() const {
std::lock_guard<std::mutex> l(_mutex);
double hit_ratio = 0;
if (_num_read_segments > 0) {
hit_ratio = (double)_num_hit_segments / (double)_num_read_segments;
double hit_ratio_5m = 0;
double hit_ratio_1h = 0;
if (g_file_cache_num_read_segments.get_value() > 0) {
hit_ratio = ((double)g_file_cache_num_hit_segments.get_value()) /
((double)g_file_cache_num_read_segments.get_value());
}
if (g_file_cache_num_read_segments_5m.get_value() > 0) {
hit_ratio_5m = ((double)g_file_cache_num_hit_segments_5m.get_value()) /
((double)g_file_cache_num_read_segments_5m.get_value());
} else {
hit_ratio_5m = 0.0;
}
if (g_file_cache_num_read_segments_1h.get_value() > 0) {
hit_ratio_1h = ((double)g_file_cache_num_hit_segments_1h.get_value()) /
((double)g_file_cache_num_read_segments_1h.get_value());
} else {
hit_ratio_1h = 0.0;
}

file_cache_hits_ratio->set_value(hit_ratio);
file_cache_hits_ratio_5m->set_value(hit_ratio_5m);
file_cache_hits_ratio_1h->set_value(hit_ratio_1h);
file_cache_removed_elements->set_value(_num_removed_segments);

file_cache_index_queue_max_size->set_value(_index_queue.get_max_size());
Expand All @@ -1171,5 +1203,34 @@ void LRUFileCache::update_cache_metrics() const {
file_cache_segment_reader_cache_size->set_value(IFileCache::file_reader_cache_size());
}

std::map<std::string, double> LRUFileCache::get_stats() {
update_cache_metrics();
std::map<std::string, double> stats;
stats["hits_ratio"] = (double)file_cache_hits_ratio->value();
stats["hits_ratio_5m"] = (double)file_cache_hits_ratio_5m->value();
stats["hits_ratio_1h"] = (double)file_cache_hits_ratio_1h->value();

stats["index_queue_max_size"] = (double)file_cache_index_queue_max_size->value();
stats["index_queue_curr_size"] = (double)file_cache_index_queue_curr_size->value();
stats["index_queue_max_elements"] = (double)file_cache_index_queue_max_elements->value();
stats["index_queue_curr_elements"] = (double)file_cache_index_queue_curr_elements->value();

stats["normal_queue_max_size"] = (double)file_cache_normal_queue_max_size->value();
stats["normal_queue_curr_size"] = (double)file_cache_normal_queue_curr_size->value();
stats["normal_queue_max_elements"] = (double)file_cache_normal_queue_max_elements->value();
stats["normal_queue_curr_elements"] = (double)file_cache_normal_queue_curr_elements->value();

stats["disposable_queue_max_size"] = (double)file_cache_disposable_queue_max_size->value();
stats["disposable_queue_curr_size"] = (double)file_cache_disposable_queue_curr_size->value();
stats["disposable_queue_max_elements"] =
(double)file_cache_disposable_queue_max_elements->value();
stats["disposable_queue_curr_elements"] =
(double)file_cache_disposable_queue_curr_elements->value();

stats["segment_reader_cache_size"] = (double)IFileCache::file_reader_cache_size();

return stats;
}

} // namespace io
} // namespace doris
8 changes: 6 additions & 2 deletions be/src/io/cache/block/block_lru_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class LRUFileCache final : public IFileCache {
void change_cache_type(const Key& key, size_t offset, CacheType new_type,
std::lock_guard<std::mutex>& cache_lock) override;

std::map<std::string, double> get_stats() override;

size_t get_available_cache_size(CacheType cache_type) const;

Status load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock);
Expand Down Expand Up @@ -206,13 +208,15 @@ class LRUFileCache final : public IFileCache {
std::thread _cache_background_thread;
std::atomic_bool _lazy_open_done {true};
std::thread _cache_background_load_thread;
size_t _num_read_segments = 0;
size_t _num_hit_segments = 0;
// size_t _num_read_segments = 0;
// size_t _num_hit_segments = 0;
size_t _num_removed_segments = 0;

std::shared_ptr<MetricEntity> _entity;

DoubleGauge* file_cache_hits_ratio = nullptr;
DoubleGauge* file_cache_hits_ratio_5m = nullptr;
DoubleGauge* file_cache_hits_ratio_1h = nullptr;
UIntGauge* file_cache_removed_elements = nullptr;

UIntGauge* file_cache_index_queue_max_size = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,4 @@ void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* bloc
}
}

} // namespace doris
} // namespace doris
Loading