From ee769bb1aff2029efc403a451defd6d9d4365aae Mon Sep 17 00:00:00 2001 From: Uniqueyou Date: Wed, 18 Jun 2025 17:48:06 +0800 Subject: [PATCH] [Feature](recycler) Add recycler metrics for recycler layer (#51409) --- cloud/src/common/bvars.cpp | 13 ++++ cloud/src/common/bvars.h | 107 ++++++++++++++++++++++++++++++++ cloud/src/main.cpp | 2 + cloud/src/recycler/recycler.cpp | 25 +++++++- 4 files changed, 144 insertions(+), 3 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index 6eb4c31a670b09..ad37a21cc15574 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -17,6 +17,10 @@ #include "common/bvars.h" +#include +#include +#include + #include #include @@ -98,6 +102,15 @@ BvarStatusWithTag g_bvar_recycler_recycle_partition_earlest_ts("recycle BvarStatusWithTag g_bvar_recycler_recycle_rowset_earlest_ts("recycler", "recycle_rowset_earlest_ts"); BvarStatusWithTag g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler", "recycle_tmp_rowset_earlest_ts"); BvarStatusWithTag g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler", "recycle_expired_txn_label_earlest_ts"); +bvar::Status g_bvar_recycler_task_max_concurrency("recycler_task_max_concurrency_num",0); +bvar::Adder g_bvar_recycler_task_concurrency; + +// recycler's mbvars +mBvarIntAdder g_bvar_recycler_instance_running("recycler_instance_running",{"instance_id"}); +mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration_ms",{"instance_id"}); +mBvarLongStatus g_bvar_recycler_instance_next_time("recycler_instance_next_time_s",{"instance_id"}); +mBvarPairStatus g_bvar_recycler_instance_recycle_times("recycler_instance_recycle_times",{"instance_id"}); +mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times("recycler_instance_recycle_last_success_times",{"instance_id"}); // txn_kv's bvars bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index dce05dc3c57d95..d7ff99da329742 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include @@ -27,6 +29,7 @@ #include #include #include +#include /** * Manage bvars that with similar names (identical prefix) @@ -97,6 +100,102 @@ template requires std::is_integral_v using BvarStatusWithTag = BvarWithTag, true>; +/** +@brief: A wrapper class for multidimensional bvar metrics. +This template class provides a convenient interface for managing multidimensional +bvar metrics. It supports various bvar types including Adder, IntRecorder, +LatencyRecorder, Maxer, and Status. +@param: BvarType The type of bvar metric to use (must be one of the supported types) +@output: Based on the bvar multidimensional counter implementation, +the metrics output format would typically follow this structure: +{metric_name}{dimension1="value1",dimension2="value2",...} value +@example: Basic usage with an Adder: +// Create a 2-dimensional counter with dimensions "region" and "service" +mBvarWrapper> request_counter("xxx_request_count", {"region", "service"}); +// Increment the counter for specific dimension values +request_counter.put({"east", "login"}, 1); +request_counter.put({"west", "search"}, 1); +request_counter.put({"east", "login"}, 1); // Now east/login has value 2 +// the output of above metrics: +xxx_request_count{region="east",service="login"} 2 +xxx_request_count{region="west",service="search"} 1 +@note: The dimensions provided in the constructor and the values provided to +put() and get() methods must match in count. Also, all supported bvar types +have different behaviors for how values are processed and retrieved. +*/ +template +class mBvarWrapper { +public: + mBvarWrapper(const std::string& metric_name, + const std::initializer_list& dim_names) + : counter_(metric_name, std::list(dim_names)) { + static_assert(is_valid_bvar_type::value, + "BvarType must be one of the supported bvar types (Adder, IntRecorder, " + "LatencyRecorder, Maxer, Status)"); + } + + template + void put(const std::initializer_list& dim_values, ValType value) { + BvarType* stats = counter_.get_stats(std::list(dim_values)); + if (stats) { + if constexpr (std::is_same_v> || + std::is_same_v> || + is_pair_status::value) { + stats->set_value(value); + } else { + *stats << value; + } + } + } + + auto get(const std::initializer_list& dim_values) { + BvarType* stats = counter_.get_stats(std::list(dim_values)); + if (stats) { + return stats->get_value(); + } + return std::declval(0); + } + +private: + template + struct is_valid_bvar_type : std::false_type {}; + template + struct is_pair_status : std::false_type {}; + template + struct is_valid_bvar_type> : std::true_type {}; + template <> + struct is_valid_bvar_type : std::true_type {}; + template + struct is_valid_bvar_type> : std::true_type {}; + template + struct is_valid_bvar_type> : std::true_type {}; + template + struct is_pair_status>> : std::true_type {}; + template <> + struct is_valid_bvar_type : std::true_type {}; + + bvar::MultiDimension counter_; +}; + +using mBvarIntAdder = mBvarWrapper>; +using mBvarDoubleAdder = mBvarWrapper>; +using mBvarIntRecorder = mBvarWrapper; +using mBvarLatencyRecorder = mBvarWrapper; +using mBvarIntMaxer = mBvarWrapper>; +using mBvarDoubleMaxer = mBvarWrapper>; +using mBvarLongStatus = mBvarWrapper>; +using mBvarDoubleStatus = mBvarWrapper>; + +namespace std { +template +inline std::ostream& operator<<(std::ostream& os, const std::pair& p) { + return os << "{" << p.first << "," << p.second << "}"; +} +} // namespace std + +template +using mBvarPairStatus = mBvarWrapper>>; + // meta-service's bvars extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn; extern BvarLatencyRecorderWithTag g_bvar_ms_precommit_txn; @@ -171,6 +270,14 @@ extern BvarStatusWithTag g_bvar_recycler_recycle_rowset_earlest_ts; extern BvarStatusWithTag g_bvar_recycler_recycle_tmp_rowset_earlest_ts; extern BvarStatusWithTag g_bvar_recycler_recycle_expired_txn_label_earlest_ts; +extern bvar::Status g_bvar_recycler_task_max_concurrency; +extern bvar::Adder g_bvar_recycler_task_concurrency; +extern mBvarIntAdder g_bvar_recycler_instance_running; +extern mBvarLongStatus g_bvar_recycler_instance_last_recycle_duration; +extern mBvarLongStatus g_bvar_recycler_instance_next_time; +extern mBvarPairStatus g_bvar_recycler_instance_recycle_times; +extern mBvarLongStatus g_bvar_recycler_instance_recycle_last_success_times; + // txn_kv's bvars extern bvar::LatencyRecorder g_bvar_txn_kv_get; extern bvar::LatencyRecorder g_bvar_txn_kv_range_get; diff --git a/cloud/src/main.cpp b/cloud/src/main.cpp index 9115158743f20c..18cf98720e9cb0 100644 --- a/cloud/src/main.cpp +++ b/cloud/src/main.cpp @@ -236,6 +236,8 @@ int main(int argc, char** argv) { std::cout << "try to start meta_service, recycler" << std::endl; } + google::SetCommandLineOption("bvar_max_dump_multi_dimension_metric_number", "2000"); + brpc::Server server; brpc::FLAGS_max_body_size = config::brpc_max_body_size; brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index a263d747b60f7d..e63f1016db3a76 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -17,8 +17,10 @@ #include "recycler/recycler.h" +#include #include #include +#include #include #include @@ -27,9 +29,11 @@ #include #include #include +#include #include #include #include +#include #include "common/stopwatch.h" #include "meta-service/meta_service.h" @@ -275,7 +279,12 @@ void Recycler::recycle_callback() { if (stopped()) return; LOG_INFO("begin to recycle instance").tag("instance_id", instance_id); auto ctime_ms = duration_cast(system_clock::now().time_since_epoch()).count(); + g_bvar_recycler_task_concurrency << 1; + g_bvar_recycler_instance_running.put({instance_id}, 1); + g_bvar_recycler_instance_recycle_times.put({instance_id}, std::make_pair(ctime_ms, -1)); ret = instance_recycler->do_recycle(); + g_bvar_recycler_task_concurrency << -1; + g_bvar_recycler_instance_running.put({instance_id}, -1); // If instance recycler has been aborted, don't finish this job if (!instance_recycler->stopped()) { finish_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id, ip_port_, @@ -285,9 +294,18 @@ void Recycler::recycle_callback() { std::lock_guard lock(mtx_); recycling_instance_map_.erase(instance_id); } - auto elpased_ms = - duration_cast(system_clock::now().time_since_epoch()).count() - - ctime_ms; + auto now = duration_cast(system_clock::now().time_since_epoch()).count(); + auto elpased_ms = now - ctime_ms; + g_bvar_recycler_instance_recycle_times.put({instance_id}, std::make_pair(ctime_ms, now)); + g_bvar_recycler_instance_last_recycle_duration.put({instance_id}, elpased_ms); + g_bvar_recycler_instance_next_time.put({instance_id}, + now + config::recycle_interval_seconds * 1000); + LOG(INFO) << "recycle instance done, " + << "instance_id=" << instance_id << " ret=" << ret << " ctime_ms: " << ctime_ms + << " now: " << now; + + g_bvar_recycler_instance_recycle_last_success_times.put({instance_id}, now); + LOG_INFO("finish recycle instance") .tag("instance_id", instance_id) .tag("cost_ms", elpased_ms); @@ -344,6 +362,7 @@ void Recycler::check_recycle_tasks() { int Recycler::start(brpc::Server* server) { instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist); + g_bvar_recycler_task_max_concurrency.set_value(config::recycle_concurrency); if (config::enable_checker) { checker_ = std::make_unique(txn_kv_);