From 71b54dd4129508eb052b189879b62af3c8c8d63c Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Tue, 11 Aug 2020 16:02:34 +0800 Subject: [PATCH] [Metrics] Support tablet level metrics --- be/src/agent/task_worker_pool.cpp | 26 +- be/src/common/daemon.cpp | 18 +- be/src/exec/olap_scanner.cpp | 7 +- be/src/exec/tablet_sink.cpp | 4 +- be/src/http/action/compaction_action.cpp | 4 +- be/src/http/action/metrics_action.cpp | 5 +- be/src/http/action/stream_load.cpp | 24 +- be/src/http/action/stream_load.h | 10 +- be/src/olap/base_compaction.cpp | 4 +- be/src/olap/base_tablet.cpp | 16 +- be/src/olap/base_tablet.h | 7 + be/src/olap/cumulative_compaction.cpp | 4 +- be/src/olap/data_dir.cpp | 18 +- be/src/olap/data_dir.h | 10 +- be/src/olap/delta_writer.cpp | 6 +- be/src/olap/fs/block_manager_metrics.cpp | 18 +- be/src/olap/memtable.cpp | 4 +- be/src/olap/memtable_flush_executor.h | 2 +- be/src/olap/olap_meta.cpp | 12 +- .../rowset/segment_v2/segment_iterator.cpp | 8 +- be/src/olap/storage_engine.cpp | 8 +- be/src/olap/tablet.cpp | 6 + be/src/olap/tablet.h | 4 + be/src/olap/tablet_manager.cpp | 18 +- be/src/olap/task/engine_alter_tablet_task.cpp | 4 +- be/src/olap/task/engine_batch_load_task.cpp | 18 +- .../task/engine_storage_migration_task.cpp | 2 +- be/src/runtime/client_cache.cpp | 14 +- be/src/runtime/client_cache.h | 6 +- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/mem_pool.cpp | 6 +- be/src/runtime/memory/chunk_allocator.cpp | 38 +-- be/src/runtime/memory/chunk_allocator.h | 2 +- .../stream_load/stream_load_executor.cpp | 12 +- be/src/runtime/tmp_file_mgr.cc | 10 +- be/src/runtime/tmp_file_mgr.h | 2 +- be/src/util/doris_metrics.cpp | 198 +++++++-------- be/src/util/doris_metrics.h | 238 +++++++++--------- be/src/util/metrics.cpp | 82 +++--- be/src/util/metrics.h | 87 +++++-- be/src/util/system_metrics.cpp | 214 ++++++++-------- be/src/util/system_metrics.h | 1 + be/src/util/thrift_server.cpp | 10 +- be/src/util/thrift_server.h | 6 +- be/test/agent/cgroups_mgr_test.cpp | 10 + be/test/exprs/hybird_set_test.cpp | 2 + be/test/http/metrics_action_test.cpp | 22 +- be/test/olap/push_handler_test.cpp | 2 +- be/test/util/doris_metrics_test.cpp | 51 ++-- be/test/util/new_metrics_test.cpp | 98 ++++---- be/test/util/system_metrics_test.cpp | 8 +- 51 files changed, 752 insertions(+), 638 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b557f3eeafedd5..1b437a63ae0247 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -263,14 +263,14 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) uint32_t try_time = 0; while (try_time < TASK_FINISH_MAX_RETRY) { - DorisMetrics::instance()->finish_task_requests_total.increment(1); + DorisMetrics::instance()->finish_task_requests_total->increment(1); AgentStatus client_status = _master_client->finish_task(finish_task_request, &result); if (client_status == DORIS_SUCCESS) { LOG(INFO) << "finish task success."; break; } else { - DorisMetrics::instance()->finish_task_requests_failed.increment(1); + DorisMetrics::instance()->finish_task_requests_failed->increment(1); LOG(WARNING) << "finish task failed. status_code=" << result.status.status_code; try_time += 1; } @@ -694,7 +694,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { worker_pool_this->_tasks.pop_front(); } - DorisMetrics::instance()->publish_task_request_total.increment(1); + DorisMetrics::instance()->publish_task_request_total->increment(1); LOG(INFO) << "get publish version task, signature:" << agent_task_req.signature; Status st; @@ -718,7 +718,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { TFinishTaskRequest finish_task_request; if (res != OLAP_SUCCESS) { - DorisMetrics::instance()->publish_task_failed_total.increment(1); + DorisMetrics::instance()->publish_task_failed_total->increment(1); // if publish failed, return failed, FE will ignore this error and // check error tablet ids and FE will also republish this task LOG(WARNING) << "publish version failed. signature:" << agent_task_req.signature @@ -890,7 +890,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { worker_pool_this->_tasks.pop_front(); } - DorisMetrics::instance()->clone_requests_total.increment(1); + DorisMetrics::instance()->clone_requests_total->increment(1); LOG(INFO) << "get clone task. signature:" << agent_task_req.signature; vector error_msgs; @@ -907,7 +907,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { TStatusCode::type status_code = TStatusCode::OK; if (status != DORIS_SUCCESS && status != DORIS_CREATE_TABLE_EXIST) { - DorisMetrics::instance()->clone_requests_failed.increment(1); + DorisMetrics::instance()->clone_requests_failed->increment(1); status_code = TStatusCode::RUNTIME_ERROR; LOG(WARNING) << "clone failed. signature: " << agent_task_req.signature; error_msgs.push_back("clone failed."); @@ -1050,12 +1050,12 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) { request.__set_tasks(_s_task_signatures); } - DorisMetrics::instance()->report_task_requests_total.increment(1); + DorisMetrics::instance()->report_task_requests_total->increment(1); TMasterResult result; AgentStatus status = worker_pool_this->_master_client->report(request, &result); if (status != DORIS_SUCCESS) { - DorisMetrics::instance()->report_task_requests_failed.increment(1); + DorisMetrics::instance()->report_task_requests_failed->increment(1); LOG(WARNING) << "finish report task failed. status:" << status << ", master host:" << worker_pool_this->_master_info.network_address.hostname << "port:" << worker_pool_this->_master_info.network_address.port; @@ -1103,12 +1103,12 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } request.__set_disks(disks); - DorisMetrics::instance()->report_disk_requests_total.increment(1); + DorisMetrics::instance()->report_disk_requests_total->increment(1); TMasterResult result; AgentStatus status = worker_pool_this->_master_client->report(request, &result); if (status != DORIS_SUCCESS) { - DorisMetrics::instance()->report_disk_requests_failed.increment(1); + DorisMetrics::instance()->report_disk_requests_failed->increment(1); LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:" << worker_pool_this->_master_info.network_address.hostname << ", port:" << worker_pool_this->_master_info.network_address.port; @@ -1161,15 +1161,15 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { #endif } int64_t max_compaction_score = - std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score.value(), - DorisMetrics::instance()->tablet_base_max_compaction_score.value()); + std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), + DorisMetrics::instance()->tablet_base_max_compaction_score->value()); request.__set_tablet_max_compaction_score(max_compaction_score); TMasterResult result; status = worker_pool_this->_master_client->report(request, &result); if (status != DORIS_SUCCESS) { - DorisMetrics::instance()->report_all_tablets_requests_failed.increment(1); + DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1); LOG(WARNING) << "finish report olap table state failed. status:" << status << ", master host:" << worker_pool_this->_master_info.network_address.hostname diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index f3516f308ccb20..be99b69bc70fd8 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -137,8 +137,8 @@ void* calculate_metrics(void* dummy) { if (last_ts == -1L) { last_ts = GetCurrentTimeMicros() / 1000; - lst_push_bytes = DorisMetrics::instance()->push_request_write_bytes.value(); - lst_query_bytes = DorisMetrics::instance()->query_scan_bytes.value(); + lst_push_bytes = DorisMetrics::instance()->push_request_write_bytes->value(); + lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes); } else { @@ -147,21 +147,21 @@ void* calculate_metrics(void* dummy) { last_ts = current_ts; // 1. push bytes per second - int64_t current_push_bytes = DorisMetrics::instance()->push_request_write_bytes.value(); + int64_t current_push_bytes = DorisMetrics::instance()->push_request_write_bytes->value(); int64_t pps = (current_push_bytes - lst_push_bytes) / (interval + 1); - DorisMetrics::instance()->push_request_write_bytes_per_second.set_value( + DorisMetrics::instance()->push_request_write_bytes_per_second->set_value( pps < 0 ? 0 : pps); lst_push_bytes = current_push_bytes; // 2. query bytes per second - int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes.value(); + int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1); - DorisMetrics::instance()->query_scan_bytes_per_second.set_value( + DorisMetrics::instance()->query_scan_bytes_per_second->set_value( qps < 0 ? 0 : qps); lst_query_bytes = current_query_bytes; // 3. max disk io util - DorisMetrics::instance()->max_disk_io_util_percent.set_value( + DorisMetrics::instance()->max_disk_io_util_percent->set_value( DorisMetrics::instance()->system_metrics()->get_max_io_util(lst_disks_io_time, 15)); // update lst map DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); @@ -171,8 +171,8 @@ void* calculate_metrics(void* dummy) { int64_t max_receive = 0; DorisMetrics::instance()->system_metrics()->get_max_net_traffic( lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive); - DorisMetrics::instance()->max_network_send_bytes_rate.set_value(max_send); - DorisMetrics::instance()->max_network_receive_bytes_rate.set_value(max_receive); + DorisMetrics::instance()->max_network_send_bytes_rate->set_value(max_send); + DorisMetrics::instance()->max_network_receive_bytes_rate->set_value(max_receive); // update lst map DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes); } diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 38bbf7c6b52a44..aa3c3278e8fa7b 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -479,8 +479,11 @@ void OlapScanner::update_counter() { COUNTER_UPDATE(_parent->_filtered_segment_counter, _reader->stats().filtered_segment_number); COUNTER_UPDATE(_parent->_total_segment_counter, _reader->stats().total_segment_number); - DorisMetrics::instance()->query_scan_bytes.increment(_compressed_bytes_read); - DorisMetrics::instance()->query_scan_rows.increment(_raw_rows_read); + DorisMetrics::instance()->query_scan_bytes->increment(_compressed_bytes_read); + DorisMetrics::instance()->query_scan_rows->increment(_raw_rows_read); + + _tablet->query_scan_bytes->increment(_compressed_bytes_read); + _tablet->query_scan_rows->increment(_raw_rows_read); _has_update_counter = true; } diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2845127525fa49..47a078257b6830 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -604,8 +604,8 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { // the real 'num_rows_load_total' will be set when sink being closed. state->update_num_rows_load_total(input_batch->num_rows()); state->update_num_bytes_load_total(input_batch->total_byte_size()); - DorisMetrics::instance()->load_rows.increment(input_batch->num_rows()); - DorisMetrics::instance()->load_bytes.increment(input_batch->total_byte_size()); + DorisMetrics::instance()->load_rows->increment(input_batch->num_rows()); + DorisMetrics::instance()->load_bytes->increment(input_batch->total_byte_size()); RowBatch* batch = input_batch; if (!_output_expr_ctxs.empty()) { SCOPED_RAW_TIMER(&_convert_batch_ns); diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 505db91f9b2a56..6b8e2ffedc7a01 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -219,7 +219,7 @@ OLAPStatus CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet OLAPStatus res = base_compaction.compact(); if (res != OLAP_SUCCESS) { if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { - DorisMetrics::instance()->base_compaction_request_failed.increment(1); + DorisMetrics::instance()->base_compaction_request_failed->increment(1); LOG(WARNING) << "failed to init base compaction. res=" << res << ", table=" << tablet->full_name(); } @@ -232,7 +232,7 @@ OLAPStatus CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet OLAPStatus res = cumulative_compaction.compact(); if (res != OLAP_SUCCESS) { if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { - DorisMetrics::instance()->cumulative_compaction_request_failed.increment(1); + DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do cumulative compaction. res=" << res << ", table=" << tablet->full_name(); } diff --git a/be/src/http/action/metrics_action.cpp b/be/src/http/action/metrics_action.cpp index 2ffca2f7860e73..809bf14360ea92 100644 --- a/be/src/http/action/metrics_action.cpp +++ b/be/src/http/action/metrics_action.cpp @@ -34,13 +34,14 @@ namespace doris { void MetricsAction::handle(HttpRequest* req) { const std::string& type = req->param("type"); + const std::string& with_tablet = req->param("with_tablet"); std::string str; if (type == "core") { str = _metric_registry->to_core_string(); } else if (type == "json") { - str = _metric_registry->to_json(); + str = _metric_registry->to_json(with_tablet == "true"); } else { - str = _metric_registry->to_prometheus(); + str = _metric_registry->to_prometheus(with_tablet == "true"); } req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 4dc6253d5449e7..ed659203889126 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -89,15 +89,15 @@ static bool is_format_support_streaming(TFileFormatType::type format) { } StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { - _stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load", {}); - METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total); - METRIC_REGISTER(_stream_load_entity, streaming_load_bytes); - METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms); - METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing); + _stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load"); + INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total); + INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_bytes); + INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms); + INT_GAUGE_METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing); } StreamLoadAction::~StreamLoadAction() { - DorisMetrics::instance()->metric_registry()->deregister_entity("stream_load"); + DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_entity); } void StreamLoadAction::handle(HttpRequest* req) { @@ -130,10 +130,10 @@ void StreamLoadAction::handle(HttpRequest* req) { HttpChannel::send_reply(req, str); // update statstics - streaming_load_requests_total.increment(1); - streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000); - streaming_load_bytes.increment(ctx->receive_bytes); - streaming_load_current_processing.increment(-1); + streaming_load_requests_total->increment(1); + streaming_load_duration_ms->increment(ctx->load_cost_nanos / 1000000); + streaming_load_bytes->increment(ctx->receive_bytes); + streaming_load_current_processing->increment(-1); } Status StreamLoadAction::_handle(StreamLoadContext* ctx) { @@ -165,7 +165,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { } int StreamLoadAction::on_header(HttpRequest* req) { - streaming_load_current_processing.increment(1); + streaming_load_current_processing->increment(1); StreamLoadContext* ctx = new StreamLoadContext(_exec_env); ctx->ref(); @@ -196,7 +196,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { } auto str = ctx->to_json(); HttpChannel::send_reply(req, str); - streaming_load_current_processing.increment(-1); + streaming_load_current_processing->increment(-1); return -1; } return 0; diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index 1555981986d410..87ce0bf44461f6 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -54,11 +54,11 @@ class StreamLoadAction : public HttpHandler { private: ExecEnv* _exec_env; - MetricEntity* _stream_load_entity; - IntCounter streaming_load_requests_total; - IntCounter streaming_load_bytes; - IntCounter streaming_load_duration_ms; - IntGauge streaming_load_current_processing; + std::shared_ptr _stream_load_entity; + IntCounter* streaming_load_requests_total; + IntCounter* streaming_load_bytes; + IntCounter* streaming_load_duration_ms; + IntGauge* streaming_load_current_processing; }; } diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 2b6343fbf6fcbd..bb2404ca3be0b8 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -52,8 +52,8 @@ OLAPStatus BaseCompaction::compact() { _state = CompactionState::SUCCESS; // 4. add metric to base compaction - DorisMetrics::instance()->base_compaction_deltas_total.increment(_input_rowsets.size()); - DorisMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size); + DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size); TRACE("save base compaction metrics"); return OLAP_SUCCESS; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index d544f3653ed8a8..507bbc663725b0 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -17,20 +17,34 @@ #include "olap/base_tablet.h" +#include "gutil/strings/substitute.h" #include "olap/data_dir.h" +#include "util/doris_metrics.h" #include "util/path_util.h" namespace doris { +extern MetricPrototype METRIC_query_scan_bytes; +extern MetricPrototype METRIC_query_scan_rows; + BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : _state(tablet_meta->tablet_state()), _tablet_meta(tablet_meta), _schema(tablet_meta->tablet_schema()), _data_dir(data_dir) { _gen_tablet_path(); + + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + strings::Substitute("Tablet.$0", tablet_id()), + {{"tablet_id", std::to_string(tablet_id())}}, + MetricEntityType::kTablet); + INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_bytes); + INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_rows); } -BaseTablet::~BaseTablet() {} +BaseTablet::~BaseTablet() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); +} OLAPStatus BaseTablet::set_tablet_state(TabletState state) { if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f3b0c2dfd6ae75..6097d23957d6be 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -23,6 +23,7 @@ #include "olap/olap_define.h" #include "olap/tablet_meta.h" #include "olap/utils.h" +#include "util/metrics.h" namespace doris { @@ -74,6 +75,12 @@ class BaseTablet : public std::enable_shared_from_this { DataDir* _data_dir; std::string _tablet_path; + // metrics of this tablet + std::shared_ptr _metric_entity = nullptr; +public: + IntCounter* query_scan_bytes; + IntCounter* query_scan_rows; + private: DISALLOW_COPY_AND_ASSIGN(BaseTablet); }; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index dcbe3d2be56086..6611e5102be336 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -66,8 +66,8 @@ OLAPStatus CumulativeCompaction::compact() { << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name(); // 6. add metric to cumulative compaction - DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size()); - DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(_input_rowsets_size); + DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size); TRACE("save cumulative compaction metrics"); return OLAP_SUCCESS; diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 863f0654ce46e7..82613eb646f105 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -79,14 +79,14 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, _current_shard(0), _meta(nullptr) { _data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(std::string("data_dir.") + path, {{"path", path}}); - METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity); - METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity); - METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity); - METRIC_REGISTER(_data_dir_metric_entity, disks_state); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state); } DataDir::~DataDir() { - DorisMetrics::instance()->metric_registry()->deregister_entity(std::string("data_dir.") + _path); + DorisMetrics::instance()->metric_registry()->deregister_entity(_data_dir_metric_entity); delete _id_generator; delete _meta; } @@ -314,7 +314,7 @@ void DataDir::health_check() { } } } - disks_state.set_value(_is_used ? 1 : 0); + disks_state->set_value(_is_used ? 1 : 0); } OLAPStatus DataDir::_read_and_write_test_file() { @@ -937,8 +937,8 @@ Status DataDir::update_capacity() { "boost::filesystem::space failed"); } - disks_total_capacity.set_value(_disk_capacity_bytes); - disks_avail_capacity.set_value(_available_bytes); + disks_total_capacity->set_value(_disk_capacity_bytes); + disks_avail_capacity->set_value(_available_bytes); LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes << ", available capacity: " << _available_bytes; @@ -946,7 +946,7 @@ Status DataDir::update_capacity() { } void DataDir::update_user_data_size(int64_t size) { - disks_data_used_capacity.set_value(size); + disks_data_used_capacity->set_value(size); } bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 18cc6124bdd37c..be434de3dfaec3 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -192,11 +192,11 @@ class DataDir { // used in convert process bool _convert_old_data_success; - MetricEntity* _data_dir_metric_entity; - IntGauge disks_total_capacity; - IntGauge disks_avail_capacity; - IntGauge disks_data_used_capacity; - IntGauge disks_state; + std::shared_ptr _data_dir_metric_entity; + IntGauge* disks_total_capacity; + IntGauge* disks_avail_capacity; + IntGauge* disks_data_used_capacity; + IntGauge* disks_state; }; } // namespace doris diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1960a16b41ba1f..b24a3410a5db3e 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -60,6 +60,10 @@ DeltaWriter::~DeltaWriter() { if (_flush_token != nullptr) { // cancel and wait all memtables in flush queue to be finished _flush_token->cancel(); + + const FlushStatistic& stat = _flush_token->get_stats(); + _tablet->flush_bytes->increment(stat.flush_size_bytes); + _tablet->flush_count->increment(stat.flush_count); } if (_tablet != nullptr) { @@ -92,7 +96,7 @@ OLAPStatus DeltaWriter::init() { TabletManager* tablet_mgr = _storage_engine->tablet_manager(); _tablet = tablet_mgr->get_tablet(_req.tablet_id, _req.schema_hash); if (_tablet == nullptr) { - LOG(WARNING) << "fail to find tablet . tablet_id=" << _req.tablet_id + LOG(WARNING) << "fail to find tablet. tablet_id=" << _req.tablet_id << ", schema_hash=" << _req.schema_hash; return OLAP_ERR_TABLE_NOT_FOUND; } diff --git a/be/src/olap/fs/block_manager_metrics.cpp b/be/src/olap/fs/block_manager_metrics.cpp index dabc93d50a9b1a..0eba2510d93ade 100644 --- a/be/src/olap/fs/block_manager_metrics.cpp +++ b/be/src/olap/fs/block_manager_metrics.cpp @@ -24,16 +24,16 @@ namespace fs { namespace internal { BlockManagerMetrics::BlockManagerMetrics() { - blocks_open_reading = &DorisMetrics::instance()->blocks_open_reading; - blocks_open_writing = &DorisMetrics::instance()->blocks_open_writing; + blocks_open_reading = DorisMetrics::instance()->blocks_open_reading; + blocks_open_writing = DorisMetrics::instance()->blocks_open_writing; - total_readable_blocks = &DorisMetrics::instance()->readable_blocks_total; - total_writable_blocks = &DorisMetrics::instance()->writable_blocks_total; - total_blocks_created = &DorisMetrics::instance()->blocks_created_total; - total_blocks_deleted = &DorisMetrics::instance()->blocks_deleted_total; - total_bytes_read = &DorisMetrics::instance()->bytes_read_total; - total_bytes_written = &DorisMetrics::instance()->bytes_written_total; - total_disk_sync = &DorisMetrics::instance()->disk_sync_total; + total_readable_blocks = DorisMetrics::instance()->readable_blocks_total; + total_writable_blocks = DorisMetrics::instance()->writable_blocks_total; + total_blocks_created = DorisMetrics::instance()->blocks_created_total; + total_blocks_deleted = DorisMetrics::instance()->blocks_deleted_total; + total_bytes_read = DorisMetrics::instance()->bytes_read_total; + total_bytes_written = DorisMetrics::instance()->bytes_written_total; + total_disk_sync = DorisMetrics::instance()->disk_sync_total; } } // namespace internal diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 25a6a81912f811..135a70af9adec0 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -124,8 +124,8 @@ OLAPStatus MemTable::flush() { } RETURN_NOT_OK(_rowset_writer->flush()); } - DorisMetrics::instance()->memtable_flush_total.increment(1); - DorisMetrics::instance()->memtable_flush_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->memtable_flush_total->increment(1); + DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); return OLAP_SUCCESS; } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index b3d0cbeb47d7c6..cb789c46cea6b1 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -35,7 +35,7 @@ class MemTable; // use atomic because it may be updated by multi threads struct FlushStatistic { int64_t flush_time_ns = 0; - int64_t flush_count= 0; + int64_t flush_count = 0; int64_t flush_size_bytes = 0; }; diff --git a/be/src/olap/olap_meta.cpp b/be/src/olap/olap_meta.cpp index a912f4fc07fa56..13ab08686c5d44 100755 --- a/be/src/olap/olap_meta.cpp +++ b/be/src/olap/olap_meta.cpp @@ -86,7 +86,7 @@ OLAPStatus OlapMeta::init() { } OLAPStatus OlapMeta::get(const int column_family_index, const std::string& key, std::string* value) { - DorisMetrics::instance()->meta_read_request_total.increment(1); + DorisMetrics::instance()->meta_read_request_total->increment(1); rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; int64_t duration_ns = 0; rocksdb::Status s; @@ -94,7 +94,7 @@ OLAPStatus OlapMeta::get(const int column_family_index, const std::string& key, SCOPED_RAW_TIMER(&duration_ns); s = _db->Get(ReadOptions(), handle, Slice(key), value); } - DorisMetrics::instance()->meta_read_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->meta_read_request_duration_us->increment(duration_ns / 1000); if (s.IsNotFound()) { return OLAP_ERR_META_KEY_NOT_FOUND; } else if (!s.ok()) { @@ -105,7 +105,7 @@ OLAPStatus OlapMeta::get(const int column_family_index, const std::string& key, } OLAPStatus OlapMeta::put(const int column_family_index, const std::string& key, const std::string& value) { - DorisMetrics::instance()->meta_write_request_total.increment(1); + DorisMetrics::instance()->meta_write_request_total->increment(1); rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; int64_t duration_ns = 0; rocksdb::Status s; @@ -115,7 +115,7 @@ OLAPStatus OlapMeta::put(const int column_family_index, const std::string& key, write_options.sync = config::sync_tablet_meta; s = _db->Put(write_options, handle, Slice(key), Slice(value)); } - DorisMetrics::instance()->meta_write_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->meta_write_request_duration_us->increment(duration_ns / 1000); if (!s.ok()) { LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString(); return OLAP_ERR_META_PUT; @@ -124,7 +124,7 @@ OLAPStatus OlapMeta::put(const int column_family_index, const std::string& key, } OLAPStatus OlapMeta::remove(const int column_family_index, const std::string& key) { - DorisMetrics::instance()->meta_write_request_total.increment(1); + DorisMetrics::instance()->meta_write_request_total->increment(1); rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; rocksdb::Status s; int64_t duration_ns = 0; @@ -134,7 +134,7 @@ OLAPStatus OlapMeta::remove(const int column_family_index, const std::string& ke write_options.sync = config::sync_tablet_meta; s = _db->Delete(write_options, handle, Slice(key)); } - DorisMetrics::instance()->meta_write_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->meta_write_request_duration_us->increment(duration_ns / 1000); if (!s.ok()) { LOG(WARNING) << "rocks db delete key:" << key << " failed, reason:" << s.ToString(); return OLAP_ERR_META_DELETE; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index ec8bb47415b4ca..90d8e8e51c4574 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -124,7 +124,7 @@ Status SegmentIterator::init(const StorageReadOptions& opts) { } Status SegmentIterator::_init() { - DorisMetrics::instance()->segment_read_total.increment(1); + DorisMetrics::instance()->segment_read_total->increment(1); // get file handle from file descriptor of segment fs::BlockManager* block_mgr = fs::fs_util::block_manager(); RETURN_IF_ERROR(block_mgr->open_block(_segment->_fname, &_rblock)); @@ -139,7 +139,7 @@ Status SegmentIterator::_init() { } Status SegmentIterator::_get_row_ranges_by_keys() { - DorisMetrics::instance()->segment_row_total.increment(num_rows()); + DorisMetrics::instance()->segment_row_total->increment(num_rows()); // fast path for empty segment or empty key ranges if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) { @@ -169,7 +169,7 @@ Status SegmentIterator::_get_row_ranges_by_keys() { size_t pre_size = _row_bitmap.cardinality(); _row_bitmap = RowRanges::ranges_to_roaring(result_ranges); _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality()); - DorisMetrics::instance()->segment_rows_by_short_key.increment(_row_bitmap.cardinality()); + DorisMetrics::instance()->segment_rows_by_short_key->increment(_row_bitmap.cardinality()); return Status::OK(); } @@ -290,7 +290,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row RowRanges::ranges_intersection(zone_map_row_ranges, delete_condition_row_ranges, &zone_map_row_ranges); } - DorisMetrics::instance()->segment_rows_read_by_zone_map.increment(zone_map_row_ranges.count()); + DorisMetrics::instance()->segment_rows_read_by_zone_map->increment(zone_map_row_ranges.count()); pre_size = condition_row_ranges->count(); RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, condition_row_ranges); _opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count()); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1a22e2cadf9209..b95e9d8598f021 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -561,7 +561,7 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { } TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); - DorisMetrics::instance()->cumulative_compaction_request_total.increment(1); + DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid)); CumulativeCompaction cumulative_compaction(best_tablet, tracker_label, _compaction_mem_tracker); @@ -570,7 +570,7 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { if (res != OLAP_SUCCESS) { best_tablet->set_last_cumu_compaction_failure_time(UnixMillis()); if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { - DorisMetrics::instance()->cumulative_compaction_request_failed.increment(1); + DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do cumulative compaction. res=" << res << ", table=" << best_tablet->full_name(); } @@ -597,7 +597,7 @@ void StorageEngine::_perform_base_compaction(DataDir* data_dir) { } TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); - DorisMetrics::instance()->base_compaction_request_total.increment(1); + DorisMetrics::instance()->base_compaction_request_total->increment(1); std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid)); BaseCompaction base_compaction(best_tablet, tracker_label, _compaction_mem_tracker); @@ -605,7 +605,7 @@ void StorageEngine::_perform_base_compaction(DataDir* data_dir) { if (res != OLAP_SUCCESS) { best_tablet->set_last_base_compaction_failure_time(UnixMillis()); if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { - DorisMetrics::instance()->base_compaction_request_failed.increment(1); + DorisMetrics::instance()->base_compaction_request_failed->increment(1); LOG(WARNING) << "failed to init base compaction. res=" << res << ", table=" << best_tablet->full_name(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8f3ca8c1254369..58768942ea028e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -49,6 +49,9 @@ using std::sort; using std::string; using std::vector; +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_count, MetricUnit::OPERATIONS); + TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) { return std::make_shared(tablet_meta, data_dir); @@ -67,6 +70,9 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, // construct _timestamped_versioned_tracker from rs and stale rs meta _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); + + INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_bytes); + INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_count); } OLAPStatus Tablet::_init_once_action() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 4f1ba22b928087..c32daad3b51008 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -301,6 +301,10 @@ class Tablet : public BaseTablet { std::unique_ptr _cumulative_compaction_policy; std::string _cumulative_compaction_type; DISALLOW_COPY_AND_ASSIGN(Tablet); + +public: + IntCounter* flush_bytes; + IntCounter* flush_count; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 2290cf205855b1..6260f2163184e3 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -207,7 +207,7 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, std::vector stores) { - DorisMetrics::instance()->create_tablet_requests_total.increment(1); + DorisMetrics::instance()->create_tablet_requests_total->increment(1); int64_t tablet_id = request.tablet_id; int32_t schema_hash = request.tablet_schema.schema_hash; @@ -231,7 +231,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, } else { LOG(WARNING) << "fail to create tablet. tablet exist but with different schema_hash. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; - DorisMetrics::instance()->create_tablet_requests_failed.increment(1); + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return OLAP_ERR_CE_TABLET_ID_EXIST; } } @@ -247,7 +247,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, << "new_tablet_id=" << tablet_id << ", new_schema_hash=" << schema_hash << ", base_tablet_id=" << request.base_tablet_id << ", base_schema_hash=" << request.base_schema_hash; - DorisMetrics::instance()->create_tablet_requests_failed.increment(1); + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return OLAP_ERR_TABLE_CREATE_META_ERROR; } // If we are doing schema-change, we should use the same data dir @@ -262,7 +262,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, AlterTabletType::SCHEMA_CHANGE, request, is_schema_change, base_tablet.get(), stores); if (tablet == nullptr) { LOG(WARNING) << "fail to create tablet. tablet_id=" << request.tablet_id; - DorisMetrics::instance()->create_tablet_requests_failed.increment(1); + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } @@ -459,7 +459,7 @@ OLAPStatus TabletManager::drop_tablet( OLAPStatus TabletManager::_drop_tablet_unlocked( TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; - DorisMetrics::instance()->drop_tablet_requests_total.increment(1); + DorisMetrics::instance()->drop_tablet_requests_total->increment(1); // Fetch tablet which need to be droped TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id, schema_hash); @@ -764,9 +764,9 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com // TODO(lingbin): Remove 'max' from metric name, it would be misunderstood as the // biggest in history(like peak), but it is really just the value at current moment. if (compaction_type == CompactionType::BASE_COMPACTION) { - DorisMetrics::instance()->tablet_base_max_compaction_score.set_value(highest_score); + DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(highest_score); } else { - DorisMetrics::instance()->tablet_cumulative_max_compaction_score.set_value(highest_score); + DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(highest_score); } } return best_tablet; @@ -906,7 +906,7 @@ void TabletManager::release_schema_change_lock(TTabletId tablet_id) { } OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { - DorisMetrics::instance()->report_tablet_requests_total.increment(1); + DorisMetrics::instance()->report_tablet_requests_total->increment(1); LOG(INFO) << "begin to process report tablet info." << "tablet_id=" << tablet_info->tablet_id << ", schema_hash=" << tablet_info->schema_hash; @@ -934,7 +934,7 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map* StorageEngine::instance()->txn_manager()->build_expire_txn_map(&expire_txn_map); LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets"; - DorisMetrics::instance()->report_all_tablets_requests_total.increment(1); + DorisMetrics::instance()->report_all_tablets_requests_total->increment(1); for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) { ReadLock rlock(&_tablet_map_lock_array[i]); diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 2d768819a15ea0..0e7e1172e4e499 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -33,7 +33,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request, _process_name(process_name) { } OLAPStatus EngineAlterTabletTask::execute() { - DorisMetrics::instance()->create_rollup_requests_total.increment(1); + DorisMetrics::instance()->create_rollup_requests_total->increment(1); SchemaChangeHandler handler; OLAPStatus res = handler.process_alter_tablet_v2(_alter_tablet_req); @@ -44,7 +44,7 @@ OLAPStatus EngineAlterTabletTask::execute() { << ", base_schema_hash=" << _alter_tablet_req.base_schema_hash << ", new_tablet_id=" << _alter_tablet_req.new_tablet_id << ", new_schema_hash=" << _alter_tablet_req.new_schema_hash; - DorisMetrics::instance()->create_rollup_requests_failed.increment(1); + DorisMetrics::instance()->create_rollup_requests_failed->increment(1); return res; } diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 7346f598d5ed98..cf9c5a83cc810d 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -295,7 +295,7 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, if (tablet_info_vec == nullptr) { LOG(WARNING) << "invalid output parameter which is nullptr pointer."; - DorisMetrics::instance()->push_requests_fail_total.increment(1); + DorisMetrics::instance()->push_requests_fail_total->increment(1); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } @@ -304,7 +304,7 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, if (tablet == nullptr) { LOG(WARNING) << "false to find tablet. tablet=" << request.tablet_id << ", schema_hash=" << request.schema_hash; - DorisMetrics::instance()->push_requests_fail_total.increment(1); + DorisMetrics::instance()->push_requests_fail_total->increment(1); return OLAP_ERR_TABLE_NOT_FOUND; } @@ -334,16 +334,16 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, << "transaction_id=" << request.transaction_id << " tablet=" << tablet->full_name() << ", cost=" << PrettyPrinter::print(duration_ns, TUnit::TIME_NS); - DorisMetrics::instance()->push_requests_fail_total.increment(1); + DorisMetrics::instance()->push_requests_fail_total->increment(1); } else { LOG(INFO) << "success to push delta, " << "transaction_id=" << request.transaction_id << " tablet=" << tablet->full_name() << ", cost=" << PrettyPrinter::print(duration_ns, TUnit::TIME_NS); - DorisMetrics::instance()->push_requests_success_total.increment(1); - DorisMetrics::instance()->push_request_duration_us.increment(duration_ns / 1000); - DorisMetrics::instance()->push_request_write_bytes.increment(push_handler.write_bytes()); - DorisMetrics::instance()->push_request_write_rows.increment(push_handler.write_rows()); + DorisMetrics::instance()->push_requests_success_total->increment(1); + DorisMetrics::instance()->push_request_duration_us->increment(duration_ns / 1000); + DorisMetrics::instance()->push_request_write_bytes->increment(push_handler.write_bytes()); + DorisMetrics::instance()->push_request_write_rows->increment(push_handler.write_rows()); } return res; } @@ -352,7 +352,7 @@ OLAPStatus EngineBatchLoadTask::_delete_data( const TPushReq& request, vector* tablet_info_vec) { LOG(INFO) << "begin to process delete data. request=" << ThriftDebugString(request); - DorisMetrics::instance()->delete_requests_total.increment(1); + DorisMetrics::instance()->delete_requests_total->increment(1); OLAPStatus res = OLAP_SUCCESS; @@ -381,7 +381,7 @@ OLAPStatus EngineBatchLoadTask::_delete_data( OLAP_LOG_WARNING("fail to push empty version for delete data. " "[res=%d tablet='%s']", res, tablet->full_name().c_str()); - DorisMetrics::instance()->delete_requests_failed.increment(1); + DorisMetrics::instance()->delete_requests_failed->increment(1); return res; } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index dc19f348f350cf..c7ffa374529c9f 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -42,7 +42,7 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate( LOG(INFO) << "begin to process storage media migrate. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash << ", dest_storage_medium=" << storage_medium; - DorisMetrics::instance()->storage_migrate_requests_total.increment(1); + DorisMetrics::instance()->storage_migrate_requests_total->increment(1); OLAPStatus res = OLAP_SUCCESS; TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index cc5bc75ecec962..d93c22ed797f20 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -71,7 +71,7 @@ Status ClientCacheHelper::get_client( _client_map[*client_key]->set_recv_timeout(timeout_ms); if (_metrics_enabled) { - thrift_used_clients.increment(1); + thrift_used_clients->increment(1); } return Status::OK(); @@ -96,7 +96,7 @@ Status ClientCacheHelper::reopen_client(client_factory factory_method, void** cl *client_key = NULL; if (_metrics_enabled) { - thrift_opened_clients.increment(-1); + thrift_opened_clients->increment(-1); } RETURN_IF_ERROR(create_client(make_network_address( @@ -127,7 +127,7 @@ Status ClientCacheHelper::create_client( _client_map[*client_key] = client_impl.release(); if (_metrics_enabled) { - thrift_opened_clients.increment(1); + thrift_opened_clients->increment(1); } return Status::OK(); @@ -149,14 +149,14 @@ void ClientCacheHelper::release_client(void** client_key) { delete info; if (_metrics_enabled) { - thrift_opened_clients.increment(-1); + thrift_opened_clients->increment(-1); } } else { j->second.push_back(*client_key); } if (_metrics_enabled) { - thrift_used_clients.increment(-1); + thrift_used_clients->increment(-1); } *client_key = NULL; @@ -222,8 +222,8 @@ void ClientCacheHelper::init_metrics(const std::string& name) { _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( std::string("thrift_client.") + name, {{"name", name}}); - METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients); - METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients); + INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients); + INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients); _metrics_enabled = true; } diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index 0af2ae5eefd3cd..fbcaede544223c 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -114,13 +114,13 @@ class ClientCacheHelper { // max connections per host in this cache, -1 means unlimited int _max_cache_size_per_host; - MetricEntity* _thrift_client_metric_entity; + std::shared_ptr _thrift_client_metric_entity; // Number of clients 'checked-out' from the cache - IntGauge thrift_used_clients; + IntGauge* thrift_used_clients; // Total clients in the cache, including those in use - IntGauge thrift_opened_clients; + IntGauge* thrift_opened_clients; // Create a new client for specific host/port in 'client' and put it in _client_map Status create_client(const TNetworkAddress& hostport, client_factory factory_method, diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c3bf83bb86863d..2b12ad4bb48328 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -220,8 +220,8 @@ Status FragmentExecState::execute() { print_id(_fragment_instance_id))); _executor.close(); } - DorisMetrics::instance()->fragment_requests_total.increment(1); - DorisMetrics::instance()->fragment_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->fragment_requests_total->increment(1); + DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); return Status::OK(); } diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 4232cde20092f4..799ec1a7b65b29 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -40,7 +40,7 @@ uint32_t MemPool::k_zero_length_region_ alignas(std::max_align_t) = MEM_POOL_POI MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { - DorisMetrics::instance()->memory_pool_bytes_total.increment(chunk.size); + DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); } MemPool::~MemPool() { @@ -50,7 +50,7 @@ MemPool::~MemPool() { ChunkAllocator::instance()->free(chunk.chunk); } mem_tracker_->Release(total_bytes_released); - DorisMetrics::instance()->memory_pool_bytes_total.increment(-total_bytes_released); + DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } void MemPool::clear() { @@ -76,7 +76,7 @@ void MemPool::free_all() { total_reserved_bytes_ = 0; mem_tracker_->Release(total_bytes_released); - DorisMetrics::instance()->memory_pool_bytes_total.increment(-total_bytes_released); + DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } bool MemPool::find_chunk(size_t min_size, bool check_limits) { diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 5fbc431dc6735d..22654b13f9778a 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -41,12 +41,12 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_count, MetricUnit::N DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_alloc_cost_ns, MetricUnit::NANOSECONDS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_cost_ns, MetricUnit::NANOSECONDS); -static IntCounter chunk_pool_local_core_alloc_count; -static IntCounter chunk_pool_other_core_alloc_count; -static IntCounter chunk_pool_system_alloc_count; -static IntCounter chunk_pool_system_free_count; -static IntCounter chunk_pool_system_alloc_cost_ns; -static IntCounter chunk_pool_system_free_cost_ns; +static IntCounter* chunk_pool_local_core_alloc_count; +static IntCounter* chunk_pool_other_core_alloc_count; +static IntCounter* chunk_pool_system_alloc_count; +static IntCounter* chunk_pool_system_free_count; +static IntCounter* chunk_pool_system_alloc_cost_ns; +static IntCounter* chunk_pool_system_free_cost_ns; #ifdef BE_TEST static std::mutex s_mutex; @@ -118,13 +118,13 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit) _arenas[i].reset(new ChunkArena()); } - _chunk_allocator_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator", {}); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_local_core_alloc_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_other_core_alloc_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_cost_ns); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_cost_ns); + _chunk_allocator_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator"); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_local_core_alloc_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_other_core_alloc_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_cost_ns); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_cost_ns); } bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { @@ -135,7 +135,7 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { if (_arenas[core_id]->pop_free_chunk(size, &chunk->data)) { _reserved_bytes.fetch_sub(size); - chunk_pool_local_core_alloc_count.increment(1); + chunk_pool_local_core_alloc_count->increment(1); return true; } if (_reserved_bytes > size) { @@ -144,7 +144,7 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { for (int i = 1; i < _arenas.size(); ++i, ++core_id) { if (_arenas[core_id % _arenas.size()]->pop_free_chunk(size, &chunk->data)) { _reserved_bytes.fetch_sub(size); - chunk_pool_other_core_alloc_count.increment(1); + chunk_pool_other_core_alloc_count->increment(1); // reset chunk's core_id to other chunk->core_id = core_id % _arenas.size(); return true; @@ -158,8 +158,8 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { // allocate from system allocator chunk->data = SystemAllocator::allocate(size); } - chunk_pool_system_alloc_count.increment(1); - chunk_pool_system_alloc_cost_ns.increment(cost_ns); + chunk_pool_system_alloc_count->increment(1); + chunk_pool_system_alloc_cost_ns->increment(cost_ns); if (chunk->data == nullptr) { return false; } @@ -177,8 +177,8 @@ void ChunkAllocator::free(const Chunk& chunk) { SCOPED_RAW_TIMER(&cost_ns); SystemAllocator::free(chunk.data, chunk.size); } - chunk_pool_system_free_count.increment(1); - chunk_pool_system_free_cost_ns.increment(cost_ns); + chunk_pool_system_free_count->increment(1); + chunk_pool_system_free_cost_ns->increment(cost_ns); return; } diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h index b4bb7fa90a8727..38f84bbf1f47c2 100644 --- a/be/src/runtime/memory/chunk_allocator.h +++ b/be/src/runtime/memory/chunk_allocator.h @@ -77,7 +77,7 @@ class ChunkAllocator { // each core has a ChunkArena std::vector> _arenas; - MetricEntity* _chunk_allocator_metric_entity; + std::shared_ptr _chunk_allocator_metric_entity; }; } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 1e5d4a196ddfe9..13dad53cfd2ba2 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -43,7 +43,7 @@ Status k_stream_load_plan_status; #endif Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_exec_plan_total.increment(1); + DorisMetrics::instance()->txn_exec_plan_total->increment(1); // submit this params #ifndef BE_TEST ctx->ref(); @@ -79,8 +79,8 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { } if (status.ok()) { - DorisMetrics::instance()->stream_receive_bytes_total.increment(ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows); + DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows); } } else { LOG(WARNING) << "fragment execute failed" @@ -121,7 +121,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { } Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_begin_request_total.increment(1); + DorisMetrics::instance()->txn_begin_request_total->increment(1); TLoadTxnBeginRequest request; set_request_auth(&request, ctx->auth); @@ -162,7 +162,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_commit_request_total.increment(1); + DorisMetrics::instance()->txn_commit_request_total->increment(1); TLoadTxnCommitRequest request; set_request_auth(&request, ctx->auth); @@ -211,7 +211,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { } void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_rollback_request_total.increment(1); + DorisMetrics::instance()->txn_rollback_request_total->increment(1); TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnRollbackRequest request; diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index a2c71bcc487de4..4e592a8a31e26a 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -54,15 +54,15 @@ const uint64_t _s_available_space_threshold_mb = 1024; TmpFileMgr::TmpFileMgr(ExecEnv* exec_env) : _exec_env(exec_env), _initialized(false), _dir_status_lock(), _tmp_dirs() { - METRIC_REGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs); + INT_GAUGE_METRIC_REGISTER(DorisMetrics::instance()->server_entity(), active_scratch_dirs); } TmpFileMgr::TmpFileMgr() { - METRIC_REGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs); + INT_GAUGE_METRIC_REGISTER(DorisMetrics::instance()->server_entity(), active_scratch_dirs); } TmpFileMgr::~TmpFileMgr() { - METRIC_DEREGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs); + METRIC_DEREGISTER(DorisMetrics::instance()->server_entity(), active_scratch_dirs); } Status TmpFileMgr::init() { @@ -124,7 +124,7 @@ Status TmpFileMgr::init_custom(const vector& tmp_dirs, bool one_dir_per_ } } - active_scratch_dirs.set_value(_tmp_dirs.size()); + active_scratch_dirs->set_value(_tmp_dirs.size()); _initialized = true; @@ -176,7 +176,7 @@ void TmpFileMgr::blacklist_device(DeviceId device_id) { added = _tmp_dirs[device_id].blacklist(); } if (added) { - active_scratch_dirs.increment(-1); + active_scratch_dirs->increment(-1); } } diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h index d7ad149b45d155..b19250a81fd6b2 100644 --- a/be/src/runtime/tmp_file_mgr.h +++ b/be/src/runtime/tmp_file_mgr.h @@ -192,7 +192,7 @@ class TmpFileMgr { std::vector _tmp_dirs; // Metric to track active scratch directories. - IntGauge active_scratch_dirs; + IntGauge* active_scratch_dirs; }; } // end namespace doris diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index f9169c73e47d9d..023074a66e9d0f 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -131,103 +131,103 @@ const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { - _server_metric_entity = _metric_registry.register_entity("server", {}); + _server_metric_entity = _metric_registry.register_entity("server"); - METRIC_REGISTER(_server_metric_entity, fragment_requests_total); - METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); - METRIC_REGISTER(_server_metric_entity, http_requests_total); - METRIC_REGISTER(_server_metric_entity, http_request_send_bytes); - METRIC_REGISTER(_server_metric_entity, query_scan_bytes); - METRIC_REGISTER(_server_metric_entity, query_scan_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, http_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, http_request_send_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows); - METRIC_REGISTER(_server_metric_entity, push_requests_success_total); - METRIC_REGISTER(_server_metric_entity, push_requests_fail_total); - METRIC_REGISTER(_server_metric_entity, push_request_duration_us); - METRIC_REGISTER(_server_metric_entity, push_request_write_bytes); - METRIC_REGISTER(_server_metric_entity, push_request_write_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_success_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_fail_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_rows); // engine_requests_total - METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total); - METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed); - METRIC_REGISTER(_server_metric_entity, drop_tablet_requests_total); - METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_total); - METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_failed); - METRIC_REGISTER(_server_metric_entity, report_tablet_requests_total); - METRIC_REGISTER(_server_metric_entity, report_tablet_requests_failed); - METRIC_REGISTER(_server_metric_entity, report_disk_requests_total); - METRIC_REGISTER(_server_metric_entity, report_disk_requests_failed); - METRIC_REGISTER(_server_metric_entity, report_task_requests_total); - METRIC_REGISTER(_server_metric_entity, report_task_requests_failed); - METRIC_REGISTER(_server_metric_entity, schema_change_requests_total); - METRIC_REGISTER(_server_metric_entity, schema_change_requests_failed); - METRIC_REGISTER(_server_metric_entity, create_rollup_requests_total); - METRIC_REGISTER(_server_metric_entity, create_rollup_requests_failed); - METRIC_REGISTER(_server_metric_entity, storage_migrate_requests_total); - METRIC_REGISTER(_server_metric_entity, delete_requests_total); - METRIC_REGISTER(_server_metric_entity, delete_requests_failed); - METRIC_REGISTER(_server_metric_entity, clone_requests_total); - METRIC_REGISTER(_server_metric_entity, clone_requests_failed); - METRIC_REGISTER(_server_metric_entity, finish_task_requests_total); - METRIC_REGISTER(_server_metric_entity, finish_task_requests_failed); - METRIC_REGISTER(_server_metric_entity, base_compaction_request_total); - METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed); - METRIC_REGISTER(_server_metric_entity, publish_task_request_total); - METRIC_REGISTER(_server_metric_entity, publish_task_failed_total); - - METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total); - METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_bytes_total); - - METRIC_REGISTER(_server_metric_entity, meta_write_request_total); - METRIC_REGISTER(_server_metric_entity, meta_write_request_duration_us); - METRIC_REGISTER(_server_metric_entity, meta_read_request_total); - METRIC_REGISTER(_server_metric_entity, meta_read_request_duration_us); - - METRIC_REGISTER(_server_metric_entity, segment_read_total); - METRIC_REGISTER(_server_metric_entity, segment_row_total); - METRIC_REGISTER(_server_metric_entity, segment_rows_by_short_key); - METRIC_REGISTER(_server_metric_entity, segment_rows_read_by_zone_map); - - METRIC_REGISTER(_server_metric_entity, txn_begin_request_total); - METRIC_REGISTER(_server_metric_entity, txn_commit_request_total); - METRIC_REGISTER(_server_metric_entity, txn_rollback_request_total); - METRIC_REGISTER(_server_metric_entity, txn_exec_plan_total); - METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total); - METRIC_REGISTER(_server_metric_entity, stream_load_rows_total); - - METRIC_REGISTER(_server_metric_entity, memtable_flush_total); - METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us); - - METRIC_REGISTER(_server_metric_entity, memory_pool_bytes_total); - METRIC_REGISTER(_server_metric_entity, process_thread_num); - METRIC_REGISTER(_server_metric_entity, process_fd_num_used); - METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_soft); - METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_hard); - - METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score); - METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score); - - METRIC_REGISTER(_server_metric_entity, push_request_write_bytes_per_second); - METRIC_REGISTER(_server_metric_entity, query_scan_bytes_per_second); - METRIC_REGISTER(_server_metric_entity, max_disk_io_util_percent); - METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate); - METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate); - - METRIC_REGISTER(_server_metric_entity, readable_blocks_total); - METRIC_REGISTER(_server_metric_entity, writable_blocks_total); - METRIC_REGISTER(_server_metric_entity, blocks_created_total); - METRIC_REGISTER(_server_metric_entity, blocks_deleted_total); - METRIC_REGISTER(_server_metric_entity, bytes_read_total); - METRIC_REGISTER(_server_metric_entity, bytes_written_total); - METRIC_REGISTER(_server_metric_entity, disk_sync_total); - METRIC_REGISTER(_server_metric_entity, blocks_open_reading); - METRIC_REGISTER(_server_metric_entity, blocks_open_writing); - - METRIC_REGISTER(_server_metric_entity, load_rows); - METRIC_REGISTER(_server_metric_entity, load_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, drop_tablet_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_tablet_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_tablet_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_disk_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_disk_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_task_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_task_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, schema_change_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, schema_change_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_rollup_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_rollup_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, storage_migrate_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clone_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clone_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, finish_task_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, finish_task_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_failed_total); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_bytes_total); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_write_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_write_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_read_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_read_request_duration_us); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_read_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_row_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_rows_by_short_key); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_rows_read_by_zone_map); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_begin_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_commit_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_rollback_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_exec_plan_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_load_rows_total); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us); + + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, memory_pool_bytes_total); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_thread_num); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_used); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_soft); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_hard); + + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score); + + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, push_request_write_bytes_per_second); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_per_second); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_disk_io_util_percent); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, readable_blocks_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, writable_blocks_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, blocks_created_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, blocks_deleted_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, bytes_read_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, bytes_written_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, disk_sync_total); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, blocks_open_reading); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, blocks_open_writing); + + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes); _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this)); } @@ -257,11 +257,11 @@ void DorisMetrics::_update_process_thread_num() { Status st = FileUtils::get_children_count(Env::Default(), ss.str(), &count); if (!st.ok()) { LOG(WARNING) << "failed to count thread num from: " << ss.str(); - process_thread_num.set_value(0); + process_thread_num->set_value(0); return; } - process_thread_num.set_value(count); + process_thread_num->set_value(count); } // get num of file descriptor of doris_be process @@ -275,10 +275,10 @@ void DorisMetrics::_update_process_fd_num() { Status st = FileUtils::get_children_count(Env::Default(), ss.str(), &count); if (!st.ok()) { LOG(WARNING) << "failed to count fd from: " << ss.str(); - process_fd_num_used.set_value(0); + process_fd_num_used->set_value(0); return; } - process_fd_num_used.set_value(count); + process_fd_num_used->set_value(count); // fd limits std::stringstream ss2; @@ -301,8 +301,8 @@ void DorisMetrics::_update_process_fd_num() { int num = sscanf(line_ptr, "Max open files %" PRId64 " %" PRId64, &values[0], &values[1]); if (num == 2) { - process_fd_num_limit_soft.set_value(values[0]); - process_fd_num_limit_hard.set_value(values[1]); + process_fd_num_limit_soft->set_value(values[0]); + process_fd_num_limit_hard->set_value(values[1]); break; } } diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 0b5a8bd78fa1bc..3ffbe746d88203 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -28,146 +28,148 @@ namespace doris { -#define REGISTER_HOOK_METRIC(name, func) \ - DorisMetrics::instance()->server_entity()->register_metric(&METRIC_##name, &DorisMetrics::instance()->name); \ - DorisMetrics::instance()->server_entity()->register_hook(#name, [&]() { \ - DorisMetrics::instance()->name.set_value(func()); \ +#define REGISTER_HOOK_METRIC(metric, func) \ + DorisMetrics::instance()->metric = \ + (UIntGauge*)(DorisMetrics::instance()->server_entity()-> \ + register_metric(&METRIC_##metric)); \ + DorisMetrics::instance()->server_entity()->register_hook(#metric, [&]() { \ + DorisMetrics::instance()->metric->set_value(func()); \ }); -#define DEREGISTER_HOOK_METRIC(name) \ - DorisMetrics::instance()->server_entity()->deregister_metric(&METRIC_##name); \ +#define DEREGISTER_HOOK_METRIC(name) \ + DorisMetrics::instance()->server_entity()->deregister_metric(&METRIC_##name); \ DorisMetrics::instance()->server_entity()->deregister_hook(#name); class DorisMetrics { public: - IntCounter fragment_requests_total; - IntCounter fragment_request_duration_us; - IntCounter http_requests_total; - IntCounter http_request_send_bytes; - IntCounter query_scan_bytes; - IntCounter query_scan_rows; - - IntCounter push_requests_success_total; - IntCounter push_requests_fail_total; - IntCounter push_request_duration_us; - IntCounter push_request_write_bytes; - IntCounter push_request_write_rows; - IntCounter create_tablet_requests_total; - IntCounter create_tablet_requests_failed; - IntCounter drop_tablet_requests_total; - - IntCounter report_all_tablets_requests_total; - IntCounter report_all_tablets_requests_failed; - IntCounter report_tablet_requests_total; - IntCounter report_tablet_requests_failed; - IntCounter report_disk_requests_total; - IntCounter report_disk_requests_failed; - IntCounter report_task_requests_total; - IntCounter report_task_requests_failed; - - IntCounter schema_change_requests_total; - IntCounter schema_change_requests_failed; - IntCounter create_rollup_requests_total; - IntCounter create_rollup_requests_failed; - IntCounter storage_migrate_requests_total; - IntCounter delete_requests_total; - IntCounter delete_requests_failed; - IntCounter clone_requests_total; - IntCounter clone_requests_failed; - - IntCounter finish_task_requests_total; - IntCounter finish_task_requests_failed; - - IntCounter base_compaction_request_total; - IntCounter base_compaction_request_failed; - IntCounter cumulative_compaction_request_total; - IntCounter cumulative_compaction_request_failed; - - IntCounter base_compaction_deltas_total; - IntCounter base_compaction_bytes_total; - IntCounter cumulative_compaction_deltas_total; - IntCounter cumulative_compaction_bytes_total; - - IntCounter publish_task_request_total; - IntCounter publish_task_failed_total; - - IntCounter meta_write_request_total; - IntCounter meta_write_request_duration_us; - IntCounter meta_read_request_total; - IntCounter meta_read_request_duration_us; + IntCounter* fragment_requests_total; + IntCounter* fragment_request_duration_us; + IntCounter* http_requests_total; + IntCounter* http_request_send_bytes; + IntCounter* query_scan_bytes; + IntCounter* query_scan_rows; + + IntCounter* push_requests_success_total; + IntCounter* push_requests_fail_total; + IntCounter* push_request_duration_us; + IntCounter* push_request_write_bytes; + IntCounter* push_request_write_rows; + IntCounter* create_tablet_requests_total; + IntCounter* create_tablet_requests_failed; + IntCounter* drop_tablet_requests_total; + + IntCounter* report_all_tablets_requests_total; + IntCounter* report_all_tablets_requests_failed; + IntCounter* report_tablet_requests_total; + IntCounter* report_tablet_requests_failed; + IntCounter* report_disk_requests_total; + IntCounter* report_disk_requests_failed; + IntCounter* report_task_requests_total; + IntCounter* report_task_requests_failed; + + IntCounter* schema_change_requests_total; + IntCounter* schema_change_requests_failed; + IntCounter* create_rollup_requests_total; + IntCounter* create_rollup_requests_failed; + IntCounter* storage_migrate_requests_total; + IntCounter* delete_requests_total; + IntCounter* delete_requests_failed; + IntCounter* clone_requests_total; + IntCounter* clone_requests_failed; + + IntCounter* finish_task_requests_total; + IntCounter* finish_task_requests_failed; + + IntCounter* base_compaction_request_total; + IntCounter* base_compaction_request_failed; + IntCounter* cumulative_compaction_request_total; + IntCounter* cumulative_compaction_request_failed; + + IntCounter* base_compaction_deltas_total; + IntCounter* base_compaction_bytes_total; + IntCounter* cumulative_compaction_deltas_total; + IntCounter* cumulative_compaction_bytes_total; + + IntCounter* publish_task_request_total; + IntCounter* publish_task_failed_total; + + IntCounter* meta_write_request_total; + IntCounter* meta_write_request_duration_us; + IntCounter* meta_read_request_total; + IntCounter* meta_read_request_duration_us; // Counters for segment_v2 // ----------------------- // total number of segments read - IntCounter segment_read_total; + IntCounter* segment_read_total; // total number of rows in queried segments (before index pruning) - IntCounter segment_row_total; + IntCounter* segment_row_total; // total number of rows selected by short key index - IntCounter segment_rows_by_short_key; + IntCounter* segment_rows_by_short_key; // total number of rows selected by zone map index - IntCounter segment_rows_read_by_zone_map; - - IntCounter txn_begin_request_total; - IntCounter txn_commit_request_total; - IntCounter txn_rollback_request_total; - IntCounter txn_exec_plan_total; - IntCounter stream_receive_bytes_total; - IntCounter stream_load_rows_total; - IntCounter load_rows; - IntCounter load_bytes; - - IntCounter memtable_flush_total; - IntCounter memtable_flush_duration_us; - - IntGauge memory_pool_bytes_total; - IntGauge process_thread_num; - IntGauge process_fd_num_used; - IntGauge process_fd_num_limit_soft; - IntGauge process_fd_num_limit_hard; + IntCounter* segment_rows_read_by_zone_map; + + IntCounter* txn_begin_request_total; + IntCounter* txn_commit_request_total; + IntCounter* txn_rollback_request_total; + IntCounter* txn_exec_plan_total; + IntCounter* stream_receive_bytes_total; + IntCounter* stream_load_rows_total; + IntCounter* load_rows; + IntCounter* load_bytes; + + IntCounter* memtable_flush_total; + IntCounter* memtable_flush_duration_us; + + IntGauge* memory_pool_bytes_total; + IntGauge* process_thread_num; + IntGauge* process_fd_num_used; + IntGauge* process_fd_num_limit_soft; + IntGauge* process_fd_num_limit_hard; // the max compaction score of all tablets. // Record base and cumulative scores separately, because // we need to get the larger of the two. - IntGauge tablet_cumulative_max_compaction_score; - IntGauge tablet_base_max_compaction_score; + IntGauge* tablet_cumulative_max_compaction_score; + IntGauge* tablet_base_max_compaction_score; // The following metrics will be calculated // by metric calculator - IntGauge push_request_write_bytes_per_second; - IntGauge query_scan_bytes_per_second; - IntGauge max_disk_io_util_percent; - IntGauge max_network_send_bytes_rate; - IntGauge max_network_receive_bytes_rate; + IntGauge* push_request_write_bytes_per_second; + IntGauge* query_scan_bytes_per_second; + IntGauge* max_disk_io_util_percent; + IntGauge* max_network_send_bytes_rate; + IntGauge* max_network_receive_bytes_rate; // Metrics related with BlockManager - IntCounter readable_blocks_total; - IntCounter writable_blocks_total; - IntCounter blocks_created_total; - IntCounter blocks_deleted_total; - IntCounter bytes_read_total; - IntCounter bytes_written_total; - IntCounter disk_sync_total; - IntGauge blocks_open_reading; - IntGauge blocks_open_writing; + IntCounter* readable_blocks_total; + IntCounter* writable_blocks_total; + IntCounter* blocks_created_total; + IntCounter* blocks_deleted_total; + IntCounter* bytes_read_total; + IntCounter* bytes_written_total; + IntCounter* disk_sync_total; + IntGauge* blocks_open_reading; + IntGauge* blocks_open_writing; // Size of some global containers - UIntGauge rowset_count_generated_and_in_use; - UIntGauge unused_rowsets_count; - UIntGauge broker_count; - UIntGauge data_stream_receiver_count; - UIntGauge fragment_endpoint_count; - UIntGauge active_scan_context_count; - UIntGauge plan_fragment_count; - UIntGauge load_channel_count; - UIntGauge result_buffer_block_count; - UIntGauge result_block_queue_count; - UIntGauge routine_load_task_count; - UIntGauge small_file_cache_count; - UIntGauge stream_load_pipe_count; - UIntGauge brpc_endpoint_stub_count; - UIntGauge tablet_writer_count; - - UIntGauge compaction_mem_current_consumption; + UIntGauge* rowset_count_generated_and_in_use; + UIntGauge* unused_rowsets_count; + UIntGauge* broker_count; + UIntGauge* data_stream_receiver_count; + UIntGauge* fragment_endpoint_count; + UIntGauge* active_scan_context_count; + UIntGauge* plan_fragment_count; + UIntGauge* load_channel_count; + UIntGauge* result_buffer_block_count; + UIntGauge* result_block_queue_count; + UIntGauge* routine_load_task_count; + UIntGauge* small_file_cache_count; + UIntGauge* stream_load_pipe_count; + UIntGauge* brpc_endpoint_stub_count; + UIntGauge* tablet_writer_count; + + UIntGauge* compaction_mem_current_consumption; static DorisMetrics* instance() { static DorisMetrics instance; @@ -182,7 +184,7 @@ class DorisMetrics { MetricRegistry* metric_registry() { return &_metric_registry; } SystemMetrics* system_metrics() { return _system_metrics.get(); } - MetricEntity* server_entity() { return _server_metric_entity; } + MetricEntity* server_entity() { return _server_metric_entity.get(); } private: // Don't allow constrctor @@ -200,7 +202,7 @@ class DorisMetrics { std::unique_ptr _system_metrics; - MetricEntity* _server_metric_entity; + std::shared_ptr _server_metric_entity; }; }; // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 1a1ff935ab8ff1..8a53d916e355ca 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -110,15 +110,13 @@ std::string MetricPrototype::combine_name(const std::string& registry_name) cons return (registry_name.empty() ? std::string() : registry_name + "_") + simple_name(); } -void MetricEntity::register_metric(const MetricPrototype* metric_type, Metric* metric) { - std::lock_guard l(_lock); - DCHECK(_metrics.find(metric_type) == _metrics.end()) << "metric is already exist! " << _name << ":" << metric_type->name; - _metrics.emplace(metric_type, metric); -} - void MetricEntity::deregister_metric(const MetricPrototype* metric_type) { std::lock_guard l(_lock); - _metrics.erase(metric_type); + auto metric = _metrics.find(metric_type); + if (metric != _metrics.end()) { + delete metric->second; + _metrics.erase(metric); + } } Metric* MetricEntity::get_metric(const std::string& name, const std::string& group_name) const { @@ -156,47 +154,62 @@ void MetricEntity::trigger_hook_unlocked(bool force) const { MetricRegistry::~MetricRegistry() { } -MetricEntity* MetricRegistry::register_entity(const std::string& name, const Labels& labels) { - std::shared_ptr entity = std::make_shared(name, labels); - +std::shared_ptr MetricRegistry::register_entity(const std::string& name, const Labels& labels, MetricEntityType type) { + std::shared_ptr entity = std::make_shared(type, name, labels); std::lock_guard l(_lock); - DCHECK(_entities.find(name) == _entities.end()) << name; - _entities.insert(std::make_pair(name, entity)); - return entity.get(); + auto inserted_entity = _entities.insert(std::make_pair(entity, 1)); + if (!inserted_entity.second) { + // If exist, increase the registered count + inserted_entity.first->second++; + } + return inserted_entity.first->first; } -void MetricRegistry::deregister_entity(const std::string& name) { +void MetricRegistry::deregister_entity(const std::shared_ptr& entity) { std::lock_guard l(_lock); - _entities.erase(name); + auto found_entity = _entities.find(entity); + if (found_entity != _entities.end()) { + // Decrease the registered count + --found_entity->second; + if (found_entity->second == 0) { + // Only erase it when registered count is zero + _entities.erase(found_entity); + } + } } -std::shared_ptr MetricRegistry::get_entity(const std::string& name) { +std::shared_ptr MetricRegistry::get_entity(const std::string& name, const Labels& labels, MetricEntityType type) { + std::shared_ptr dummy = std::make_shared(type, name, labels); + std::lock_guard l(_lock); - auto entity = _entities.find(name); + auto entity = _entities.find(dummy); if (entity == _entities.end()) { return std::shared_ptr(); } - return entity->second; + return entity->first; } void MetricRegistry::trigger_all_hooks(bool force) const { std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(force); + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(force); } } -std::string MetricRegistry::to_prometheus() const { +std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const { std::stringstream ss; // Reorder by MetricPrototype EntityMetricsByType entity_metrics_by_types; std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(false); - for (const auto& metric : entity.second->_metrics) { - std::pair new_elem = std::make_pair(entity.second.get(), metric.second); + if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { + continue; + } + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + for (const auto& metric : entity.first->_metrics) { + std::pair new_elem = std::make_pair(entity.first.get(), metric.second); auto found = entity_metrics_by_types.find(metric.first); if (found == entity_metrics_by_types.end()) { entity_metrics_by_types.emplace(metric.first, std::vector>({new_elem})); @@ -224,14 +237,17 @@ std::string MetricRegistry::to_prometheus() const { return ss.str(); } -std::string MetricRegistry::to_json() const { +std::string MetricRegistry::to_json(bool with_tablet_metrics) const { rj::Document doc{rj::kArrayType}; rj::Document::AllocatorType& allocator = doc.GetAllocator(); std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(false); - for (const auto& metric : entity.second->_metrics) { + if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { + continue; + } + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + for (const auto& metric : entity.first->_metrics) { rj::Value metric_obj(rj::kObjectType); // tags rj::Value tag_obj(rj::kObjectType); @@ -244,7 +260,7 @@ std::string MetricRegistry::to_json() const { allocator); } // MetricEntity's labels - for (auto& label : entity.second->_labels) { + for (auto& label : entity.first->_labels) { tag_obj.AddMember( rj::Value(label.first.c_str(), allocator), rj::Value(label.second.c_str(), allocator), @@ -270,9 +286,9 @@ std::string MetricRegistry::to_core_string() const { std::stringstream ss; std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(false); - for (const auto &metric : entity.second->_metrics) { + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + for (const auto &metric : entity.first->_metrics) { if (metric.first->is_core_metric) { ss << metric.first->combine_name(_name) << " LONG " << metric.second->to_string() << "\n"; } diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index 50093fe1da14ac..61fccce7b96a80 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -212,6 +212,14 @@ class LockGauge : public LockSimpleMetric { virtual ~LockGauge() {} }; +using IntCounter = CoreLocalCounter; +using IntAtomicCounter = AtomicCounter; +using UIntCounter = CoreLocalCounter; +using DoubleCounter = LockCounter; +using IntGauge = AtomicGauge; +using UIntGauge = AtomicGauge; +using DoubleGauge = LockGauge; + using Labels = std::unordered_map; struct MetricPrototype { public: @@ -263,8 +271,17 @@ struct MetricPrototype { #define DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(name, unit, desc) \ DEFINE_METRIC_PROTOTYPE(name, MetricType::GAUGE, unit, desc, "", Labels(), false) -#define METRIC_REGISTER(entity, metric) \ - entity->register_metric(&METRIC_##metric, &metric) +#define INT_COUNTER_METRIC_REGISTER(entity, metric) \ + metric = (IntCounter*)(entity->register_metric(&METRIC_##metric)) + +#define INT_GAUGE_METRIC_REGISTER(entity, metric) \ + metric = (IntGauge*)(entity->register_metric(&METRIC_##metric)) + +#define INT_UGAUGE_METRIC_REGISTER(entity, metric) \ + metric = (UIntGauge*)(entity->register_metric(&METRIC_##metric)) + +#define INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, metric) \ + metric = (IntAtomicCounter*)(entity->register_metric(&METRIC_##metric)) #define METRIC_DEREGISTER(entity, metric) \ entity->deregister_metric(&METRIC_##metric) @@ -284,12 +301,34 @@ struct MetricPrototypeEqualTo { using MetricMap = std::unordered_map; +enum class MetricEntityType { + kServer, + kTablet +}; + class MetricEntity { public: - MetricEntity(const std::string& name, const Labels& labels) - : _name(name), _labels(labels) {} + MetricEntity(MetricEntityType type, const std::string& name, const Labels& labels) + : _type(type), _name(name), _labels(labels) {} + ~MetricEntity() { + for (auto& metric : _metrics) { + delete metric.second; + } + } + + const std::string& name() const { return _name; } + + template + Metric* register_metric(const MetricPrototype* metric_type) { + std::lock_guard l(_lock); + auto inserted_metric = _metrics.insert(std::make_pair(metric_type, nullptr)); + if (inserted_metric.second) { + // If not exist, make a new metric pointer + inserted_metric.first->second = new T(); + } + return inserted_metric.first->second; + } - void register_metric(const MetricPrototype* metric_type, Metric* metric); void deregister_metric(const MetricPrototype* metric_type); Metric* get_metric(const std::string& name, const std::string& group_name = "") const; @@ -300,7 +339,10 @@ class MetricEntity { private: friend class MetricRegistry; + friend class MetricEntityHash; + friend class MetricEntityEqualTo; + MetricEntityType _type; std::string _name; Labels _labels; @@ -309,6 +351,20 @@ class MetricEntity { std::map> _hooks; }; +struct MetricEntityHash { + size_t operator()(const std::shared_ptr metric_entity) const { + return std::hash()(metric_entity->name()); + } +}; + +struct MetricEntityEqualTo { + bool operator()(const std::shared_ptr first, const std::shared_ptr second) const { + return first->_type == second->_type + && first->_name == second->_name + && first->_labels == second->_labels; + } +}; + using EntityMetricsByType = std::unordered_map>, MetricPrototypeHash, MetricPrototypeEqualTo>; class MetricRegistry { @@ -316,29 +372,22 @@ class MetricRegistry { MetricRegistry(const std::string& name) : _name(name) {} ~MetricRegistry(); - MetricEntity* register_entity(const std::string& name, const Labels& labels); - void deregister_entity(const std::string& name); - std::shared_ptr get_entity(const std::string& name); + std::shared_ptr register_entity(const std::string& name, const Labels& labels = {}, MetricEntityType type = MetricEntityType::kServer); + void deregister_entity(const std::shared_ptr& entity); + std::shared_ptr get_entity(const std::string& name, const Labels& labels = {}, MetricEntityType type = MetricEntityType::kServer); void trigger_all_hooks(bool force) const; - std::string to_prometheus() const; - std::string to_json() const; + std::string to_prometheus(bool with_tablet_metrics = false) const; + std::string to_json(bool with_tablet_metrics = false) const; std::string to_core_string() const; private: const std::string _name; mutable SpinLock _lock; - std::unordered_map> _entities; + // MetricEntity -> register count + std::unordered_map, int32_t, MetricEntityHash, MetricEntityEqualTo> _entities; }; -using IntCounter = CoreLocalCounter; -using IntAtomicCounter = AtomicCounter; -using UIntCounter = CoreLocalCounter; -using DoubleCounter = LockCounter; -using IntGauge = AtomicGauge; -using UIntGauge = AtomicGauge; -using DoubleGauge = LockGauge; - } // namespace doris diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index c1aae9e7619d83..a51a624639ace3 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -43,42 +43,42 @@ DEFINE_CPU_COUNTER_METRIC(guest_nice); // /proc/stat: http://www.linuxhowtos.org/System/procstat.htm struct CpuMetrics { CpuMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, cpu_user); - METRIC_REGISTER(entity, cpu_nice); - METRIC_REGISTER(entity, cpu_system); - METRIC_REGISTER(entity, cpu_idle); - METRIC_REGISTER(entity, cpu_iowait); - METRIC_REGISTER(entity, cpu_irq); - METRIC_REGISTER(entity, cpu_soft_irq); - METRIC_REGISTER(entity, cpu_steal); - METRIC_REGISTER(entity, cpu_guest); - METRIC_REGISTER(entity, cpu_guest_nice); - - metrics[0] = &cpu_user; - metrics[1] = &cpu_nice; - metrics[2] = &cpu_system; - metrics[3] = &cpu_idle; - metrics[4] = &cpu_iowait; - metrics[5] = &cpu_irq; - metrics[6] = &cpu_soft_irq; - metrics[7] = &cpu_steal; - metrics[8] = &cpu_guest; - metrics[9] = &cpu_guest_nice; + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_user); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_nice); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_system); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_idle); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_iowait); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_irq); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_soft_irq); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_steal); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_guest); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_guest_nice); + + metrics[0] = cpu_user; + metrics[1] = cpu_nice; + metrics[2] = cpu_system; + metrics[3] = cpu_idle; + metrics[4] = cpu_iowait; + metrics[5] = cpu_irq; + metrics[6] = cpu_soft_irq; + metrics[7] = cpu_steal; + metrics[8] = cpu_guest; + metrics[9] = cpu_guest_nice; } static constexpr int cpu_num_metrics = 10; MetricEntity* entity = nullptr; - IntAtomicCounter cpu_user; - IntAtomicCounter cpu_nice; - IntAtomicCounter cpu_system; - IntAtomicCounter cpu_idle; - IntAtomicCounter cpu_iowait; - IntAtomicCounter cpu_irq; - IntAtomicCounter cpu_soft_irq; - IntAtomicCounter cpu_steal; - IntAtomicCounter cpu_guest; - IntAtomicCounter cpu_guest_nice; + IntAtomicCounter* cpu_user; + IntAtomicCounter* cpu_nice; + IntAtomicCounter* cpu_system; + IntAtomicCounter* cpu_idle; + IntAtomicCounter* cpu_iowait; + IntAtomicCounter* cpu_irq; + IntAtomicCounter* cpu_soft_irq; + IntAtomicCounter* cpu_steal; + IntAtomicCounter* cpu_guest; + IntAtomicCounter* cpu_guest_nice; IntAtomicCounter* metrics[cpu_num_metrics]; }; @@ -89,11 +89,11 @@ DEFINE_MEMORY_GAUGE_METRIC(allocated_bytes, MetricUnit::BYTES); struct MemoryMetrics { MemoryMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, memory_allocated_bytes); + INT_GAUGE_METRIC_REGISTER(entity, memory_allocated_bytes); } MetricEntity* entity = nullptr; - IntGauge memory_allocated_bytes; + IntGauge* memory_allocated_bytes; }; #define DEFINE_DISK_COUNTER_METRIC(metric, unit) \ @@ -109,25 +109,25 @@ DEFINE_DISK_COUNTER_METRIC(io_time_weigthed, MetricUnit::MILLISECONDS); struct DiskMetrics { DiskMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, disk_reads_completed); - METRIC_REGISTER(entity, disk_bytes_read); - METRIC_REGISTER(entity, disk_read_time_ms); - METRIC_REGISTER(entity, disk_writes_completed); - METRIC_REGISTER(entity, disk_bytes_written); - METRIC_REGISTER(entity, disk_write_time_ms); - METRIC_REGISTER(entity, disk_io_time_ms); - METRIC_REGISTER(entity, disk_io_time_weigthed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_reads_completed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_bytes_read); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_read_time_ms); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_writes_completed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_bytes_written); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_write_time_ms); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_io_time_ms); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_io_time_weigthed); } MetricEntity* entity = nullptr; - IntAtomicCounter disk_reads_completed; - IntAtomicCounter disk_bytes_read; - IntAtomicCounter disk_read_time_ms; - IntAtomicCounter disk_writes_completed; - IntAtomicCounter disk_bytes_written; - IntAtomicCounter disk_write_time_ms; - IntAtomicCounter disk_io_time_ms; - IntAtomicCounter disk_io_time_weigthed; + IntAtomicCounter* disk_reads_completed; + IntAtomicCounter* disk_bytes_read; + IntAtomicCounter* disk_read_time_ms; + IntAtomicCounter* disk_writes_completed; + IntAtomicCounter* disk_bytes_written; + IntAtomicCounter* disk_write_time_ms; + IntAtomicCounter* disk_io_time_ms; + IntAtomicCounter* disk_io_time_weigthed; }; #define DEFINE_NETWORK_COUNTER_METRIC(metric, unit) \ @@ -139,17 +139,17 @@ DEFINE_NETWORK_COUNTER_METRIC(send_packets, MetricUnit::PACKETS); struct NetworkMetrics { NetworkMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, network_receive_bytes); - METRIC_REGISTER(entity, network_receive_packets); - METRIC_REGISTER(entity, network_send_bytes); - METRIC_REGISTER(entity, network_send_packets); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_receive_bytes); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_receive_packets); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_send_bytes); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_send_packets); } MetricEntity* entity = nullptr; - IntAtomicCounter network_receive_bytes; - IntAtomicCounter network_receive_packets; - IntAtomicCounter network_send_bytes; - IntAtomicCounter network_send_packets; + IntAtomicCounter* network_receive_bytes; + IntAtomicCounter* network_receive_packets; + IntAtomicCounter* network_send_bytes; + IntAtomicCounter* network_send_packets; }; #define DEFINE_SNMP_COUNTER_METRIC(metric, unit, desc) \ @@ -162,17 +162,17 @@ DEFINE_SNMP_COUNTER_METRIC(tcp_out_segs, MetricUnit::NOUNIT, "All send TCP packe // metrics read from /proc/net/snmp struct SnmpMetrics { SnmpMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, snmp_tcp_in_errs); - METRIC_REGISTER(entity, snmp_tcp_retrans_segs); - METRIC_REGISTER(entity, snmp_tcp_in_segs); - METRIC_REGISTER(entity, snmp_tcp_out_segs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_errs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_retrans_segs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_segs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_out_segs); } MetricEntity* entity = nullptr; - IntAtomicCounter snmp_tcp_in_errs; - IntAtomicCounter snmp_tcp_retrans_segs; - IntAtomicCounter snmp_tcp_in_segs; - IntAtomicCounter snmp_tcp_out_segs; + IntAtomicCounter* snmp_tcp_in_errs; + IntAtomicCounter* snmp_tcp_retrans_segs; + IntAtomicCounter* snmp_tcp_in_segs; + IntAtomicCounter* snmp_tcp_out_segs; }; #define DEFINE_FD_COUNTER_METRIC(metric, unit) \ @@ -182,13 +182,13 @@ DEFINE_FD_COUNTER_METRIC(num_used, MetricUnit::NOUNIT); struct FileDescriptorMetrics { FileDescriptorMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, fd_num_limit); - METRIC_REGISTER(entity, fd_num_used); + INT_GAUGE_METRIC_REGISTER(entity, fd_num_limit); + INT_GAUGE_METRIC_REGISTER(entity, fd_num_used); } MetricEntity* entity = nullptr; - IntGauge fd_num_limit; - IntGauge fd_num_used; + IntGauge* fd_num_limit; + IntGauge* fd_num_used; }; const char* SystemMetrics::_s_hook_name = "system_metrics"; @@ -198,24 +198,20 @@ SystemMetrics::SystemMetrics(MetricRegistry* registry, const std::vector& network_interfaces) { DCHECK(registry != nullptr); _registry = registry; -#ifndef BE_TEST - auto entity = DorisMetrics::instance()->server_entity(); -#else - auto entity = _registry->register_entity("server", {}); -#endif - DCHECK(entity != nullptr); - entity->register_hook(_s_hook_name, std::bind(&SystemMetrics::update, this)); - _install_cpu_metrics(entity); - _install_memory_metrics(entity); + _server_entity = _registry->register_entity("server"); + DCHECK(_server_entity != nullptr); + _server_entity->register_hook(_s_hook_name, std::bind(&SystemMetrics::update, this)); + _install_cpu_metrics(_server_entity.get()); + _install_memory_metrics(_server_entity.get()); _install_disk_metrics(disk_devices); _install_net_metrics(network_interfaces); - _install_fd_metrics(entity); - _install_snmp_metrics(entity); + _install_fd_metrics(_server_entity.get()); + _install_snmp_metrics(_server_entity.get()); } SystemMetrics::~SystemMetrics() { - DCHECK(_registry != nullptr); - _registry->get_entity("server")->deregister_hook(_s_hook_name); + DCHECK(_server_entity != nullptr); + _server_entity->deregister_hook(_s_hook_name); for (auto& it : _disk_metrics) { delete it.second; @@ -302,14 +298,14 @@ void SystemMetrics::_update_memory_metrics() { size_t allocated_bytes = 0; MallocExtension::instance()->GetNumericProperty( "generic.current_allocated_bytes", &allocated_bytes); - _memory_metrics->memory_allocated_bytes.set_value(allocated_bytes); + _memory_metrics->memory_allocated_bytes->set_value(allocated_bytes); #endif } void SystemMetrics::_install_disk_metrics(const std::set& disk_devices) { for (auto& disk_device : disk_devices) { auto disk_entity = _registry->register_entity(std::string("disk_metrics.") + disk_device, {{"device", disk_device}}); - DiskMetrics* metrics = new DiskMetrics(disk_entity); + DiskMetrics* metrics = new DiskMetrics(disk_entity.get()); _disk_metrics.emplace(disk_device, metrics); } } @@ -368,21 +364,21 @@ void SystemMetrics::_update_disk_metrics() { } // update disk metrics // reads_completed: 4 reads completed successfully - it->second->disk_reads_completed.set_value(values[0]); + it->second->disk_reads_completed->set_value(values[0]); // bytes_read: 6 sectors read * 512; 5 reads merged is ignored - it->second->disk_bytes_read.set_value(values[2] * 512); + it->second->disk_bytes_read->set_value(values[2] * 512); // read_time_ms: 7 time spent reading (ms) - it->second->disk_read_time_ms.set_value(values[3]); + it->second->disk_read_time_ms->set_value(values[3]); // writes_completed: 8 writes completed - it->second->disk_writes_completed.set_value(values[4]); + it->second->disk_writes_completed->set_value(values[4]); // bytes_written: 10 sectors write * 512; 9 writes merged is ignored - it->second->disk_bytes_written.set_value(values[6] * 512); + it->second->disk_bytes_written->set_value(values[6] * 512); // write_time_ms: 11 time spent writing (ms) - it->second->disk_write_time_ms.set_value(values[7]); + it->second->disk_write_time_ms->set_value(values[7]); // io_time_ms: 13 time spent doing I/Os (ms) - it->second->disk_io_time_ms.set_value(values[9]); + it->second->disk_io_time_ms->set_value(values[9]); // io_time_weigthed: 14 - weighted time spent doing I/Os (ms) - it->second->disk_io_time_weigthed.set_value(values[10]); + it->second->disk_io_time_weigthed->set_value(values[10]); } if (ferror(fp) != 0) { char buf[64]; @@ -395,7 +391,7 @@ void SystemMetrics::_update_disk_metrics() { void SystemMetrics::_install_net_metrics(const std::vector& interfaces) { for (auto& interface : interfaces) { auto interface_entity = _registry->register_entity(std::string("network_metrics.") + interface, {{"device", interface}}); - NetworkMetrics* metrics = new NetworkMetrics(interface_entity); + NetworkMetrics* metrics = new NetworkMetrics(interface_entity.get()); _network_metrics.emplace(interface, metrics); } } @@ -486,10 +482,10 @@ void SystemMetrics::_update_net_metrics() { default: break; } - it->second->network_receive_bytes.set_value(receive_bytes); - it->second->network_receive_packets.set_value(receive_packets); - it->second->network_send_bytes.set_value(send_bytes); - it->second->network_send_packets.set_value(send_packets); + it->second->network_receive_bytes->set_value(receive_bytes); + it->second->network_receive_packets->set_value(receive_packets); + it->second->network_send_bytes->set_value(send_bytes); + it->second->network_send_packets->set_value(send_packets); } if (ferror(fp) != 0) { char buf[64]; @@ -558,10 +554,10 @@ void SystemMetrics::_update_snmp_metrics() { int64_t in_errs = atoi64(metrics[header_map["InErrs"]]); int64_t in_segs = atoi64(metrics[header_map["InSegs"]]); int64_t out_segs = atoi64(metrics[header_map["OutSegs"]]); - _snmp_metrics->snmp_tcp_retrans_segs.set_value(retrans_segs); - _snmp_metrics->snmp_tcp_in_errs.set_value(in_errs); - _snmp_metrics->snmp_tcp_in_segs.set_value(in_segs); - _snmp_metrics->snmp_tcp_out_segs.set_value(out_segs); + _snmp_metrics->snmp_tcp_retrans_segs->set_value(retrans_segs); + _snmp_metrics->snmp_tcp_in_errs->set_value(in_errs); + _snmp_metrics->snmp_tcp_in_segs->set_value(in_segs); + _snmp_metrics->snmp_tcp_out_segs->set_value(out_segs); if (ferror(fp) != 0) { char buf[64]; @@ -599,8 +595,8 @@ void SystemMetrics::_update_fd_metrics() { int num = sscanf(_line_ptr, "%" PRId64 " %" PRId64 " %" PRId64, &values[0], &values[1], &values[2]); if (num == 3) { - _fd_metrics->fd_num_limit.set_value(values[2]); - _fd_metrics->fd_num_used.set_value(values[0] - values[1]); + _fd_metrics->fd_num_limit->set_value(values[2]); + _fd_metrics->fd_num_used->set_value(values[0] - values[1]); } } @@ -616,7 +612,7 @@ int64_t SystemMetrics::get_max_io_util( const std::map& lst_value, int64_t interval_sec) { int64_t max = 0; for (auto& it : _disk_metrics) { - int64_t cur = it.second->disk_io_time_ms.value(); + int64_t cur = it.second->disk_io_time_ms->value(); const auto find = lst_value.find(it.first); if (find == lst_value.end()) { continue; @@ -630,7 +626,7 @@ int64_t SystemMetrics::get_max_io_util( void SystemMetrics::get_disks_io_time(std::map* map) { map->clear(); for (auto& it : _disk_metrics) { - map->emplace(it.first, it.second->disk_io_time_ms.value()); + map->emplace(it.first, it.second->disk_io_time_ms->value()); } } @@ -641,8 +637,8 @@ void SystemMetrics::get_network_traffic( rcv_map->clear(); for (auto& it : _network_metrics) { if (it.first == "lo") { continue; } - send_map->emplace(it.first, it.second->network_send_bytes.value()); - rcv_map->emplace(it.first, it.second->network_receive_bytes.value()); + send_map->emplace(it.first, it.second->network_send_bytes->value()); + rcv_map->emplace(it.first, it.second->network_receive_bytes->value()); } } @@ -654,8 +650,8 @@ void SystemMetrics::get_max_net_traffic( int64_t max_send = 0; int64_t max_rcv = 0; for (auto& it : _network_metrics) { - int64_t cur_send = it.second->network_send_bytes.value(); - int64_t cur_rcv = it.second->network_receive_bytes.value(); + int64_t cur_send = it.second->network_send_bytes->value(); + int64_t cur_rcv = it.second->network_receive_bytes->value(); const auto find_send = lst_send_map.find(it.first); if (find_send != lst_send_map.end()) { diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index 80ed60446faf76..9f422a52402869 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -89,6 +89,7 @@ class SystemMetrics { char* _line_ptr = nullptr; size_t _line_buf_size = 0; MetricRegistry* _registry = nullptr; + std::shared_ptr _server_entity = nullptr; }; } diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index 442c54d1faed5b..8af993a461c607 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -226,8 +226,8 @@ void* ThriftServer::ThriftServerEventProcessor::createContext( _thrift_server->_session_handler->session_start(*_session_key); } - _thrift_server->thrift_connections_total.increment(1L); - _thrift_server->thrift_current_connections.increment(1L); + _thrift_server->thrift_connections_total->increment(1L); + _thrift_server->thrift_current_connections->increment(1L); // Store the _session_key in the per-client context to avoid recomputing // it. If only this were accessible from RPC method calls, we wouldn't have to @@ -256,7 +256,7 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext( _thrift_server->_session_keys.erase(_session_key); } - _thrift_server->thrift_current_connections.increment(-1L); + _thrift_server->thrift_current_connections->increment(-1L); } ThriftServer::ThriftServer( @@ -275,8 +275,8 @@ ThriftServer::ThriftServer( _processor(processor), _session_handler(NULL) { _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(std::string("thrift_server.") + name, {{"name", name}}); - METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections); - METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total); + INT_GAUGE_METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections); + INT_COUNTER_METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total); } Status ThriftServer::start() { diff --git a/be/src/util/thrift_server.h b/be/src/util/thrift_server.h index 779857bcc730e3..66efbb658b8f94 100644 --- a/be/src/util/thrift_server.h +++ b/be/src/util/thrift_server.h @@ -145,11 +145,11 @@ class ThriftServer { class ThriftServerEventProcessor; friend class ThriftServerEventProcessor; - MetricEntity* _thrift_server_metric_entity; + std::shared_ptr _thrift_server_metric_entity; // Number of currently active connections - IntGauge thrift_current_connections; + IntGauge* thrift_current_connections; // Total connections made over the lifetime of this server - IntCounter thrift_connections_total; + IntCounter* thrift_connections_total; }; } diff --git a/be/test/agent/cgroups_mgr_test.cpp b/be/test/agent/cgroups_mgr_test.cpp index f46d1554b5a288..4ffcb4e3c99e5c 100644 --- a/be/test/agent/cgroups_mgr_test.cpp +++ b/be/test/agent/cgroups_mgr_test.cpp @@ -34,6 +34,8 @@ using std::string; namespace doris { +StorageEngine* k_engine = nullptr; + class CgroupsMgrTest : public testing::Test { public: // create a mock cgroup folder @@ -41,6 +43,14 @@ class CgroupsMgrTest : public testing::Test { ASSERT_FALSE(boost::filesystem::exists(_s_cgroup_path)); // create a mock cgroup path ASSERT_TRUE(boost::filesystem::create_directory(_s_cgroup_path)); + + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &k_engine); + ASSERT_TRUE(s.ok()) << s.to_string(); } // delete the mock cgroup folder diff --git a/be/test/exprs/hybird_set_test.cpp b/be/test/exprs/hybird_set_test.cpp index f8a102ccb0a910..fc884c3f02f0bc 100644 --- a/be/test/exprs/hybird_set_test.cpp +++ b/be/test/exprs/hybird_set_test.cpp @@ -326,6 +326,8 @@ TEST_F(HybirdSetTest, string) { ASSERT_FALSE(set->find(&b)); } TEST_F(HybirdSetTest, timestamp) { + CpuInfo::init(); + HybirdSetBase* set = HybirdSetBase::create_set(TYPE_DATETIME); char s1[] = "2012-01-20 01:10:01"; char s2[] = "1990-10-20 10:10:10.123456 "; diff --git a/be/test/http/metrics_action_test.cpp b/be/test/http/metrics_action_test.cpp index db98454ef83a13..1dc6c72df1a5ca 100644 --- a/be/test/http/metrics_action_test.cpp +++ b/be/test/http/metrics_action_test.cpp @@ -53,18 +53,18 @@ class MetricsActionTest : public testing::Test { TEST_F(MetricsActionTest, prometheus_output) { MetricRegistry metric_registry("test"); - MetricEntity* entity = metric_registry.register_entity("metrics_action_test.prometheus_output", {}); + std::shared_ptr entity = metric_registry.register_entity("metrics_action_test.prometheus_output"); - IntGauge cpu_idle; + IntGauge* cpu_idle = nullptr; DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(cpu_idle, MetricUnit::PERCENT); - METRIC_REGISTER(entity, cpu_idle); + INT_GAUGE_METRIC_REGISTER(entity, cpu_idle); - IntCounter put_requests_total; + IntCounter* put_requests_total = nullptr; DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(put_requests_total, MetricUnit::NOUNIT, "", requests_total, Labels({{"type", "put"}, {"path", "/sports"}})); - METRIC_REGISTER(entity, put_requests_total); + INT_COUNTER_METRIC_REGISTER(entity, put_requests_total); - cpu_idle.set_value(50); - put_requests_total.increment(2345); + cpu_idle->set_value(50); + put_requests_total->increment(2345); s_expect_response = "# TYPE test_cpu_idle gauge\n" @@ -78,13 +78,13 @@ TEST_F(MetricsActionTest, prometheus_output) { TEST_F(MetricsActionTest, prometheus_no_prefix) { MetricRegistry metric_registry(""); - MetricEntity* entity = metric_registry.register_entity("metrics_action_test.prometheus_no_prefix", {}); + std::shared_ptr entity = metric_registry.register_entity("metrics_action_test.prometheus_no_prefix"); - IntGauge cpu_idle; + IntGauge* cpu_idle = nullptr; DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(cpu_idle, MetricUnit::PERCENT); - METRIC_REGISTER(entity, cpu_idle); + INT_GAUGE_METRIC_REGISTER(entity, cpu_idle); - cpu_idle.set_value(50); + cpu_idle->set_value(50); s_expect_response = "# TYPE cpu_idle gauge\n" diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp index 2f1465b6fd57fd..39e0108fc776f0 100644 --- a/be/test/olap/push_handler_test.cpp +++ b/be/test/olap/push_handler_test.cpp @@ -34,7 +34,6 @@ class PushHandlerTest : public testing::Test { init(); } static void SetUpTestCase() { - CpuInfo::init(); UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal"); CastFunctions::init(); } @@ -458,5 +457,6 @@ TEST_F(PushHandlerTest, PushBrokerReaderNormal) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); + CpuInfo::init(); return RUN_ALL_TESTS(); } diff --git a/be/test/util/doris_metrics_test.cpp b/be/test/util/doris_metrics_test.cpp index 711eceb6be70f9..1ccd6544ae00c8 100644 --- a/be/test/util/doris_metrics_test.cpp +++ b/be/test/util/doris_metrics_test.cpp @@ -31,152 +31,151 @@ class DorisMetricsTest : public testing::Test { }; TEST_F(DorisMetricsTest, Normal) { - auto metric_registry = DorisMetrics::instance()->metric_registry(); - auto server_entity = metric_registry->get_entity("server"); + auto server_entity = DorisMetrics::instance()->server_entity(); // check metric { - DorisMetrics::instance()->fragment_requests_total.increment(12); + DorisMetrics::instance()->fragment_requests_total->increment(12); auto metric = server_entity->get_metric("fragment_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("12", metric->to_string().c_str()); } { - DorisMetrics::instance()->fragment_request_duration_us.increment(101); + DorisMetrics::instance()->fragment_request_duration_us->increment(101); auto metric = server_entity->get_metric("fragment_request_duration_us"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("101", metric->to_string().c_str()); } { - DorisMetrics::instance()->http_requests_total.increment(102); + DorisMetrics::instance()->http_requests_total->increment(102); auto metric = server_entity->get_metric("http_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("102", metric->to_string().c_str()); } { - DorisMetrics::instance()->http_request_send_bytes.increment(104); + DorisMetrics::instance()->http_request_send_bytes->increment(104); auto metric = server_entity->get_metric("http_request_send_bytes"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("104", metric->to_string().c_str()); } { - DorisMetrics::instance()->query_scan_bytes.increment(104); + DorisMetrics::instance()->query_scan_bytes->increment(104); auto metric = server_entity->get_metric("query_scan_bytes"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("104", metric->to_string().c_str()); } { - DorisMetrics::instance()->query_scan_rows.increment(105); + DorisMetrics::instance()->query_scan_rows->increment(105); auto metric = server_entity->get_metric("query_scan_rows"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("105", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_requests_success_total.increment(106); + DorisMetrics::instance()->push_requests_success_total->increment(106); auto metric = server_entity->get_metric("push_requests_success_total", "push_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("106", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_requests_fail_total.increment(107); + DorisMetrics::instance()->push_requests_fail_total->increment(107); auto metric = server_entity->get_metric("push_requests_fail_total", "push_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("107", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_request_duration_us.increment(108); + DorisMetrics::instance()->push_request_duration_us->increment(108); auto metric = server_entity->get_metric("push_request_duration_us"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("108", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_request_write_bytes.increment(109); + DorisMetrics::instance()->push_request_write_bytes->increment(109); auto metric = server_entity->get_metric("push_request_write_bytes"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("109", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_request_write_rows.increment(110); + DorisMetrics::instance()->push_request_write_rows->increment(110); auto metric = server_entity->get_metric("push_request_write_rows"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("110", metric->to_string().c_str()); } // engine request { - DorisMetrics::instance()->create_tablet_requests_total.increment(15); + DorisMetrics::instance()->create_tablet_requests_total->increment(15); auto metric = server_entity->get_metric("create_tablet_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("15", metric->to_string().c_str()); } { - DorisMetrics::instance()->drop_tablet_requests_total.increment(16); + DorisMetrics::instance()->drop_tablet_requests_total->increment(16); auto metric = server_entity->get_metric("drop_tablet_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("16", metric->to_string().c_str()); } { - DorisMetrics::instance()->report_all_tablets_requests_total.increment(17); + DorisMetrics::instance()->report_all_tablets_requests_total->increment(17); auto metric = server_entity->get_metric("report_all_tablets_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("17", metric->to_string().c_str()); } { - DorisMetrics::instance()->report_tablet_requests_total.increment(18); + DorisMetrics::instance()->report_tablet_requests_total->increment(18); auto metric = server_entity->get_metric("report_tablet_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("18", metric->to_string().c_str()); } { - DorisMetrics::instance()->schema_change_requests_total.increment(19); + DorisMetrics::instance()->schema_change_requests_total->increment(19); auto metric = server_entity->get_metric("schema_change_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("19", metric->to_string().c_str()); } { - DorisMetrics::instance()->create_rollup_requests_total.increment(20); + DorisMetrics::instance()->create_rollup_requests_total->increment(20); auto metric = server_entity->get_metric("create_rollup_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("20", metric->to_string().c_str()); } { - DorisMetrics::instance()->storage_migrate_requests_total.increment(21); + DorisMetrics::instance()->storage_migrate_requests_total->increment(21); auto metric = server_entity->get_metric("storage_migrate_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("21", metric->to_string().c_str()); } { - DorisMetrics::instance()->delete_requests_total.increment(22); + DorisMetrics::instance()->delete_requests_total->increment(22); auto metric = server_entity->get_metric("delete_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("22", metric->to_string().c_str()); } // comapction { - DorisMetrics::instance()->base_compaction_deltas_total.increment(30); + DorisMetrics::instance()->base_compaction_deltas_total->increment(30); auto metric = server_entity->get_metric("base_compaction_deltas_total", "compaction_deltas_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("30", metric->to_string().c_str()); } { - DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(31); + DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(31); auto metric = server_entity->get_metric("cumulative_compaction_deltas_total", "compaction_deltas_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("31", metric->to_string().c_str()); } { - DorisMetrics::instance()->base_compaction_bytes_total.increment(32); + DorisMetrics::instance()->base_compaction_bytes_total->increment(32); auto metric = server_entity->get_metric("base_compaction_bytes_total", "compaction_bytes_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("32", metric->to_string().c_str()); } { - DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(33); + DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(33); auto metric = server_entity->get_metric("cumulative_compaction_bytes_total", "compaction_bytes_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("33", metric->to_string().c_str()); } // Gauge { - DorisMetrics::instance()->memory_pool_bytes_total.increment(40); + DorisMetrics::instance()->memory_pool_bytes_total->increment(40); auto metric = server_entity->get_metric("memory_pool_bytes_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("40", metric->to_string().c_str()); diff --git a/be/test/util/new_metrics_test.cpp b/be/test/util/new_metrics_test.cpp index 67b7aea77a189a..c1b4af8eea9455 100644 --- a/be/test/util/new_metrics_test.cpp +++ b/be/test/util/new_metrics_test.cpp @@ -205,9 +205,8 @@ TEST_F(MetricsTest, MetricPrototype) { } TEST_F(MetricsTest, MetricEntityWithMetric) { - MetricEntity entity("test_entity", {}); + MetricEntity entity(MetricEntityType::kServer, "test_entity", {}); - IntCounter cpu_idle; MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle"); // Before register @@ -215,14 +214,14 @@ TEST_F(MetricsTest, MetricEntityWithMetric) { ASSERT_EQ(nullptr, metric); // Register - entity.register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(12); + IntCounter* cpu_idle = (IntCounter*)entity.register_metric(&cpu_idle_type); + cpu_idle->increment(12); metric = entity.get_metric("cpu_idle"); ASSERT_NE(nullptr, metric); ASSERT_EQ("12", metric->to_string()); - cpu_idle.increment(8); + cpu_idle->increment(8); ASSERT_EQ("20", metric->to_string()); // Deregister @@ -234,15 +233,14 @@ TEST_F(MetricsTest, MetricEntityWithMetric) { } TEST_F(MetricsTest, MetricEntityWithHook) { - MetricEntity entity("test_entity", {}); + MetricEntity entity(MetricEntityType::kServer, "test_entity", {}); - IntCounter cpu_idle; MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle"); // Register - entity.register_metric(&cpu_idle_type, &cpu_idle); - entity.register_hook("test_hook", [&cpu_idle]() { - cpu_idle.increment(6); + IntCounter* cpu_idle = (IntCounter*)entity.register_metric(&cpu_idle_type); + entity.register_hook("test_hook", [cpu_idle]() { + cpu_idle->increment(6); }); // Before hook @@ -272,22 +270,30 @@ TEST_F(MetricsTest, MetricRegistryRegister) { ASSERT_EQ("[]", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - // Before register - auto entity = registry.get_entity("test_entity").get(); - ASSERT_EQ(nullptr, entity); - // Register - entity = registry.register_entity("test_entity", {}); - ASSERT_NE(nullptr, entity); - - // After register - auto entity1 = registry.get_entity("test_entity").get(); + auto entity1 = registry.register_entity("test_entity"); ASSERT_NE(nullptr, entity1); - ASSERT_EQ(entity, entity1); - registry.deregister_entity("test_entity"); - entity = registry.get_entity("test_entity").get(); - ASSERT_EQ(nullptr, entity); + // Register again + auto entity2 = registry.register_entity("test_entity"); + ASSERT_NE(nullptr, entity2); + ASSERT_EQ(entity1.get(), entity2.get()); + + // Deregister entity once + registry.deregister_entity(entity1); + + // Still exist and equal to entity1 + entity2 = registry.get_entity("test_entity"); + ASSERT_NE(nullptr, entity2); + ASSERT_EQ(entity1.get(), entity2.get()); + + // Deregister entity twice + registry.deregister_entity(entity2); + + // Not exist and registry is empty + entity2 = registry.get_entity("test_entity"); + ASSERT_EQ(nullptr, entity2); + ASSERT_EQ("", registry.to_prometheus()); } TEST_F(MetricsTest, MetricRegistryOutput) { @@ -302,85 +308,79 @@ TEST_F(MetricsTest, MetricRegistryOutput) { { // Register one common metric to the entity - auto entity = registry.register_entity("test_entity", {}); + auto entity = registry.register_entity("test_entity"); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "", {}, true); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(8); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(8); ASSERT_EQ(R"(# TYPE test_registry_cpu_idle gauge test_registry_cpu_idle 8 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu_idle"},"unit":"percent","value":8}])", registry.to_json()); ASSERT_EQ("test_registry_cpu_idle LONG 8\n", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register one metric with group name to the entity - auto entity = registry.register_entity("test_entity", {}); + auto entity = registry.register_entity("test_entity"); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}, false); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(18); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(18); ASSERT_EQ(R"(# TYPE test_registry_cpu gauge test_registry_cpu{mode="idle"} 18 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"idle"},"unit":"percent","value":18}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register one common metric to an entity with label auto entity = registry.register_entity("test_entity", {{"name", "lable_test"}}); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle"); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(28); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(28); ASSERT_EQ(R"(# TYPE test_registry_cpu_idle gauge test_registry_cpu_idle{name="lable_test"} 28 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu_idle","name":"lable_test"},"unit":"percent","value":28}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register one common metric with group name to an entity with label auto entity = registry.register_entity("test_entity", {{"name", "lable_test"}}); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(38); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(38); ASSERT_EQ(R"(# TYPE test_registry_cpu gauge test_registry_cpu{name="lable_test",mode="idle"} 38 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"idle","name":"lable_test"},"unit":"percent","value":38}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register two common metrics to one entity - auto entity = registry.register_entity("test_entity", {}); + auto entity = registry.register_entity("test_entity"); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(48); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(48); - IntGauge cpu_guest; MetricPrototype cpu_guest_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_guest", "", "cpu", {{"mode", "guest"}}); - entity->register_metric(&cpu_guest_type, &cpu_guest); - cpu_guest.increment(58); + IntGauge* cpu_guest = (IntGauge*)entity->register_metric(&cpu_guest_type); + cpu_guest->increment(58); ASSERT_EQ(R"(# TYPE test_registry_cpu gauge test_registry_cpu{mode="idle"} 48 @@ -388,7 +388,7 @@ test_registry_cpu{mode="guest"} 58 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"guest"},"unit":"percent","value":58},{"tags":{"metric":"cpu","mode":"idle"},"unit":"percent","value":48}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } } } diff --git a/be/test/util/system_metrics_test.cpp b/be/test/util/system_metrics_test.cpp index a3e8cbb6eaed9a..c18215e0928807 100644 --- a/be/test/util/system_metrics_test.cpp +++ b/be/test/util/system_metrics_test.cpp @@ -108,7 +108,7 @@ TEST_F(SystemMetricsTest, normal) { ASSERT_TRUE(memory_allocated_bytes != nullptr); // network - auto net_entity = registry.get_entity("network_metrics.xgbe0"); + auto net_entity = registry.get_entity("network_metrics.xgbe0", {{"device", "xgbe0"}}); ASSERT_TRUE(net_entity != nullptr); Metric* receive_bytes = net_entity->get_metric("network_receive_bytes"); @@ -125,7 +125,7 @@ TEST_F(SystemMetricsTest, normal) { ASSERT_STREQ("88277614", send_packets->to_string().c_str()); // disk - auto disk_entity = registry.get_entity("disk_metrics.sda"); + auto disk_entity = registry.get_entity("disk_metrics.sda", {{"device", "sda"}}); ASSERT_TRUE(disk_entity != nullptr); Metric* bytes_read = disk_entity->get_metric("disk_bytes_read"); ASSERT_TRUE(bytes_read != nullptr); @@ -207,13 +207,13 @@ TEST_F(SystemMetricsTest, no_proc_file) { Metric* memory_allocated_bytes = entity->get_metric("memory_allocated_bytes"); ASSERT_TRUE(memory_allocated_bytes != nullptr); // network - auto net_entity = registry.get_entity("network_metrics.xgbe0"); + auto net_entity = registry.get_entity("network_metrics.xgbe0", {{"device", "xgbe0"}}); ASSERT_TRUE(net_entity != nullptr); Metric* receive_bytes = net_entity->get_metric("network_receive_bytes"); ASSERT_TRUE(receive_bytes != nullptr); ASSERT_STREQ("0", receive_bytes->to_string().c_str()); // disk - auto disk_entity = registry.get_entity("disk_metrics.sda"); + auto disk_entity = registry.get_entity("disk_metrics.sda", {{"device", "sda"}}); ASSERT_TRUE(disk_entity != nullptr); Metric* bytes_read = disk_entity->get_metric("disk_bytes_read"); ASSERT_TRUE(bytes_read != nullptr);