Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,7 @@ mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_del_counter("rpc_kv_clean_txn_labe
// get_txn_id
mBvarInt64Adder g_bvar_rpc_kv_get_txn_id_get_counter("rpc_kv_get_txn_id_get_counter",{"instance_id"});

// meta ranges
mBvarStatus<int64_t> g_bvar_fdb_kv_ranges_count("fdb_kv_ranges_count", {"category","instance_id", "sub_category"});

// clang-format on
11 changes: 9 additions & 2 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <bvar/latency_recorder.h>
#include <bvar/multi_dimension.h>
#include <bvar/reducer.h>
#include <bvar/status.h>

#include <cstdint>
#include <initializer_list>
Expand Down Expand Up @@ -139,8 +140,7 @@ class mBvarWrapper {
void put(const std::initializer_list<std::string>& dim_values, ValType value) {
BvarType* stats = counter_.get_stats(std::list<std::string>(dim_values));
if (stats) {
if constexpr (std::is_same_v<BvarType, bvar::Status<double>> ||
std::is_same_v<BvarType, bvar::Status<long>>) {
if constexpr (is_bvar_status<BvarType>::value) {
stats->set_value(value);
} else {
*stats << value;
Expand Down Expand Up @@ -170,6 +170,10 @@ class mBvarWrapper {
struct is_valid_bvar_type<bvar::Status<T>> : std::true_type {};
template <>
struct is_valid_bvar_type<bvar::LatencyRecorder> : std::true_type {};
template <typename T>
struct is_bvar_status : std::false_type {};
template <typename T>
struct is_bvar_status<bvar::Status<T>> : std::true_type {};

bvar::MultiDimension<BvarType> counter_;
};
Expand Down Expand Up @@ -464,3 +468,6 @@ extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clean_txn_label_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_txn_id_get_counter;

// meta ranges
extern mBvarStatus<int64_t> g_bvar_fdb_kv_ranges_count;
89 changes: 89 additions & 0 deletions cloud/src/common/metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
#include <cstdint>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>

#include "common/bvars.h"
#include "common/logging.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"

namespace doris::cloud {
extern std::set<std::string> get_key_prefix_contants();

// The format of the output is shown in "test/fdb_metric_example.json"
static const std::string FDB_STATUS_KEY = "\xff\xff/status/json";
Expand Down Expand Up @@ -298,10 +302,95 @@ static void export_fdb_status_details(const std::string& status_str) {
get_process_metric("memory");
}

// boundaries include the key category{meta, txn, recycle...}, instance_id and sub_category{rowset, txn_label...}
// encode look like
// 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
// 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version}
// the func count same key to hashmap kv_range_count
// exmaple:
// kv_range_boundaries: meta|instance1|rowset|..., meta|instance1|rowset|..., meta|instance2|rowset|..., txn|instance1|txn_label|...
// kv_range_count output: <meta|instance1|rowset, 2>, <meta|instance2|rowset, 1>, <txn|instance1|txn_label, 1>
void get_kv_range_boundaries_count(std::vector<std::string>& kv_range_boundaries,
std::unordered_map<std::string, size_t>& kv_range_count) {
size_t prefix_size = FdbTxnKv::fdb_partition_key_prefix().size();
for (auto&& boundary : kv_range_boundaries) {
if (boundary.size() < prefix_size + 1 || boundary[prefix_size] != CLOUD_USER_KEY_SPACE01) {
continue;
}

std::string_view user_key(boundary);
user_key.remove_prefix(prefix_size + 1); // Skip the KEY_SPACE prefix.
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&user_key, &out); // ignore any error, since the boundary key might be truncated.

auto visitor = [](auto&& arg) -> std::string {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, std::string>) {
return arg;
} else {
return std::to_string(arg);
}
};

if (!out.empty()) {
std::string key;
// whatever the boundary's category have similar encode part:
// category, instance_id, sub_category
// we can distinguish boundary using the three parts
// some boundaries do not contain all three parts, so restrictions based on size are also necessary
for (size_t i = 0; i < 3 && i < out.size(); ++i) {
key += std::visit(visitor, std::get<0>(out[i])) + '|';
}
key.pop_back();
kv_range_count[key]++;
}
}
}

static void export_fdb_kv_ranges_details(TxnKv* kv) {
auto* txn_kv = dynamic_cast<FdbTxnKv*>(kv);
if (!txn_kv) {
LOG(WARNING) << "this method only support fdb txn kv";
return;
}

std::vector<std::string> partition_boundaries;
TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries);
if (code != TxnErrorCode::TXN_OK) {
auto msg = fmt::format("failed to get boundaries, code={}", code);
return;
}

std::unordered_map<std::string, size_t> partition_count;
get_kv_range_boundaries_count(partition_boundaries, partition_count);

auto key_prefix_set = get_key_prefix_contants();
std::unordered_map<std::string, int64_t> category_count;
for (auto&& [key, count] : partition_count) {
std::vector<std::string> keys;
size_t pos {};
// split key with '|'
do {
size_t p = std::min(key.size(), key.find('|', pos));
keys.emplace_back(key.substr(pos, p - pos));
pos = p + 1;
} while (pos < key.size());
keys.resize(3);
if (key_prefix_set.contains(keys[0])) {
category_count[keys[0]] += count;
g_bvar_fdb_kv_ranges_count.put({keys[0], keys[1], keys[2]}, count);
} else {
LOG(WARNING) << fmt::format("Unknow meta range type: {}", keys[0]);
continue;
}
}
}

void FdbMetricExporter::export_fdb_metrics(TxnKv* txn_kv) {
int64_t busyness = 0;
std::string fdb_status = get_fdb_status(txn_kv);
export_fdb_status_details(fdb_status);
export_fdb_kv_ranges_details(txn_kv);
if (auto* kv = dynamic_cast<FdbTxnKv*>(txn_kv); kv != nullptr) {
busyness = static_cast<int64_t>(kv->get_client_thread_busyness() * 100);
g_bvar_fdb_client_thread_busyness_percent.set_value(busyness);
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/common/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "meta-store/txn_kv.h"

namespace doris::cloud {
extern void get_kv_range_boundaries_count(std::vector<std::string>& partition_boundaries,
std::unordered_map<std::string, size_t>& partition_count);

class FdbMetricExporter {
public:
Expand Down
37 changes: 5 additions & 32 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ extern int decrypt_instance_info(InstanceInfoPB& instance, const std::string& in
MetaServiceCode& code, std::string& msg,
std::shared_ptr<Transaction>& txn);

extern void get_kv_range_boundaries_count(std::vector<std::string>& partition_boundaries,
std::unordered_map<std::string, size_t>& partition_count);

template <typename Message>
static google::protobuf::util::Status parse_json_message(const std::string& unresolved_path,
const std::string& body, Message* req) {
Expand Down Expand Up @@ -532,38 +535,8 @@ static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Con
auto msg = fmt::format("failed to get boundaries, code={}", code);
return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg);
}

std::unordered_map<std::string, size_t> partition_count;
size_t prefix_size = FdbTxnKv::fdb_partition_key_prefix().size();
for (auto&& boundary : partition_boundaries) {
if (boundary.size() < prefix_size + 1 || boundary[prefix_size] != CLOUD_USER_KEY_SPACE01) {
continue;
}

std::string_view user_key(boundary);
user_key.remove_prefix(prefix_size + 1); // Skip the KEY_SPACE prefix.
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&user_key, &out); // ignore any error, since the boundary key might be truncated.

auto visitor = [](auto&& arg) -> std::string {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, std::string>) {
return arg;
} else {
return std::to_string(arg);
}
};

if (!out.empty()) {
std::string key;
for (size_t i = 0; i < 3 && i < out.size(); ++i) {
key += std::visit(visitor, std::get<0>(out[i]));
key += '|';
}
key.pop_back(); // omit the last '|'
partition_count[key]++;
}
}
get_kv_range_boundaries_count(partition_boundaries, partition_count);

// sort ranges by count
std::vector<std::pair<std::string, size_t>> meta_ranges;
Expand All @@ -575,7 +548,7 @@ static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Con
std::sort(meta_ranges.begin(), meta_ranges.end(),
[](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; });

std::string body = fmt::format("total partitions: {}\n", partition_boundaries.size());
std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size());
for (auto&& [key, count] : meta_ranges) {
body += fmt::format("{}: {}\n", key, count);
}
Expand Down
19 changes: 18 additions & 1 deletion cloud/src/meta-store/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "meta-store/keys.h"

#include <set>

#include "meta-store/codec.h"

namespace doris::cloud {
Expand Down Expand Up @@ -526,5 +528,20 @@ int decode_key(std::string_view* in,
}
return 0;
}

//==================================================================================
// Key Prefix Map
//==================================================================================
std::set<std::string> get_key_prefix_contants() {
std::set<std::string> key_prefix_set;
key_prefix_set.insert(INSTANCE_KEY_PREFIX);
key_prefix_set.insert(TXN_KEY_PREFIX);
key_prefix_set.insert(VERSION_KEY_PREFIX);
key_prefix_set.insert(META_KEY_PREFIX);
key_prefix_set.insert(RECYCLE_KEY_PREFIX);
key_prefix_set.insert(STATS_KEY_PREFIX);
key_prefix_set.insert(JOB_KEY_PREFIX);
key_prefix_set.insert(COPY_KEY_PREFIX);
key_prefix_set.insert(VAULT_KEY_PREFIX);
return key_prefix_set;
}
} // namespace doris::cloud
Loading