From 4f7e8306b14756e582c1e814cdf9dae0db2e6c17 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 24 Oct 2024 18:01:13 +0800 Subject: [PATCH 1/3] [feature](meta-service) Support querying and adjusting rpc qps limit on meta service --- cloud/src/meta-service/meta_service_http.cpp | 74 +++++++++- cloud/src/rate-limiter/rate_limiter.cpp | 95 +++++++++++-- cloud/src/rate-limiter/rate_limiter.h | 24 +++- cloud/test/meta_service_http_test.cpp | 83 +++++++++++ cloud/test/rate_limiter_test.cpp | 139 +++++++++++++++---- 5 files changed, 374 insertions(+), 41 deletions(-) diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 95907376dd28c0..eb28b3bc5a6a7b 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -30,8 +30,11 @@ #include #include +#include #include #include +#include +#include #include #include #include @@ -42,6 +45,7 @@ #include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" #include "meta_service.h" +#include "rate-limiter/rate_limiter.h" namespace doris::cloud { @@ -333,6 +337,70 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller return http_json_reply(resp.status()); } +static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + bool is_attr_set = false; + int64_t default_qps_limit = -1; + if (auto default_qps_limit_str = std::string(http_query(uri, "default_qps_limit")); + !default_qps_limit_str.empty()) { + try { + default_qps_limit = std::stoll(default_qps_limit_str); + } catch (const std::exception& ex) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("param `default_qps_limit` is not a legal int64 type:{}", + ex.what())); + } + if (default_qps_limit < 0) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "`default_qps_limit` should not be less than 0"); + } + is_attr_set |= true; + } + std::string_view specific_max_qps_limit = http_query(uri, "specific_max_qps_limit"); + is_attr_set |= (!specific_max_qps_limit.empty()); + if (!is_attr_set) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + "default_qps_limit(int64) or " + "specific_max_qps_limit(list of[rpcname:qps(int64);]) is required as query param"); + } + auto rate_limiter = service->rate_limiter(); + rate_limiter->reset_rate_limit(service, default_qps_limit, specific_max_qps_limit.data()); + return http_json_reply(MetaServiceCode::OK, "success to adjust rate limit"); +} + +static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + auto rpc_name = std::string(http_query(uri, "rpc_name")); + auto rate_limiter = service->rate_limiter(); + rapidjson::Document d; + if (rpc_name.empty()) { + auto get_qps_limit = [&d](std::string_view rpc_name, + std::shared_ptr rpc_limiter) { + auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), + rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()), + d.GetAllocator()); + }; + rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); + } else { + auto rpc_limiter = rate_limiter->get_rpc_rate_limiter(rpc_name); + if (rpc_limiter == nullptr) [[unlikely]] { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("rpc_name={} is not exists", rpc_name)); + } + auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), + rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()), + d.GetAllocator()); + } + rapidjson::StringBuffer sb; + rapidjson::PrettyWriter writer(sb); + d.Accept(writer); + return http_json_reply(MetaServiceCode::OK, sb.GetString()); +} + static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { auto& uri = ctrl->http_request().uri(); std::string_view key = http_query(uri, "key"); @@ -615,13 +683,17 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"abort_tablet_job", process_abort_tablet_job}, {"alter_ram_user", process_alter_ram_user}, {"alter_iam", process_alter_iam}, + {"adjust_rate_limit", process_adjust_rate_limit}, + {"query_rate_limit", process_query_rate_limit}, {"v1/abort_txn", process_abort_txn}, {"v1/abort_tablet_job", process_abort_tablet_job}, {"v1/alter_ram_user", process_alter_ram_user}, {"v1/alter_iam", process_alter_iam}, + {"v1/adjust_rate_limit", process_adjust_rate_limit}, + {"v1/query_rate_limit", process_query_rate_limit}, }; - auto cntl = static_cast(controller); + auto* cntl = static_cast(controller); brpc::ClosureGuard closure_guard(done); // Prepare input request info diff --git a/cloud/src/rate-limiter/rate_limiter.cpp b/cloud/src/rate-limiter/rate_limiter.cpp index 8988ff0560b170..b7151a1e9e8535 100644 --- a/cloud/src/rate-limiter/rate_limiter.cpp +++ b/cloud/src/rate-limiter/rate_limiter.cpp @@ -20,8 +20,10 @@ #include #include +#include #include #include +#include #include "common/bvars.h" #include "common/config.h" @@ -29,10 +31,10 @@ namespace doris::cloud { -void RateLimiter::init(google::protobuf::Service* service) { - std::map rpc_name_to_max_qps_limit; +std::unordered_map parse_specific_qps_limit(const std::string& list_str) { + std::unordered_map rpc_name_to_max_qps_limit; std::vector max_qps_limit_list; - butil::SplitString(config::specific_max_qps_limit, ';', &max_qps_limit_list); + butil::SplitString(list_str, ';', &max_qps_limit_list); for (const auto& v : max_qps_limit_list) { auto p = v.find(':'); if (p != std::string::npos && p != (v.size() - 1)) { @@ -41,29 +43,43 @@ void RateLimiter::init(google::protobuf::Service* service) { int64_t max_qps_limit = std::stoll(v.substr(p + 1)); if (max_qps_limit > 0) { rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit; - LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: " << max_qps_limit; } } catch (...) { - LOG(WARNING) << "failed to set max_qps_limit to rpc: " << rpc_name + LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << rpc_name << " config: " << v; } } } + return rpc_name_to_max_qps_limit; +} + +template +void for_each_rpc_name(google::protobuf::Service* service, Callable cb) { auto method_size = service->GetDescriptor()->method_count(); for (auto i = 0; i < method_size; ++i) { std::string rpc_name = service->GetDescriptor()->method(i)->name(); - int64_t max_qps_limit = config::default_max_qps_limit; + cb(rpc_name); + } +} - auto it = rpc_name_to_max_qps_limit.find(rpc_name); - if (it != rpc_name_to_max_qps_limit.end()) { +void RateLimiter::init(google::protobuf::Service* service) { + auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit); + std::unique_lock write_lock(shared_mtx_); + for_each_rpc_name(service, [&](const std::string& rpc_name) { + auto it = rpc_name_to_specific_limit.find(rpc_name); + int64_t max_qps_limit = config::default_max_qps_limit; + if (it != rpc_name_to_specific_limit.end()) { max_qps_limit = it->second; } limiters_[rpc_name] = std::make_shared(rpc_name, max_qps_limit); + }); + for (const auto& [k, _] : rpc_name_to_specific_limit) { + rpc_with_specific_limit_.insert(k); } } std::shared_ptr RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) { - // no need to be locked, because it is only modified during initialization + std::shared_lock read_lock(shared_mtx_); auto it = limiters_.find(rpc_name); if (it == limiters_.end()) { return nullptr; @@ -71,6 +87,52 @@ std::shared_ptr RateLimiter::get_rpc_rate_limiter(const std::str return it->second; } +void RateLimiter::reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit, + const std::string& specific_max_qps_limit) { + // TODO: merge specific_max_qps_limit + auto specific_limits = parse_specific_qps_limit(specific_max_qps_limit); + + auto reset_specific_limit = [&](const std::string& rpc_name) -> bool { + if (auto it = specific_limits.find(rpc_name); it != specific_limits.end()) { + limiters_[rpc_name]->reset_max_qps_limit(it->second); + return true; + } + return false; + }; + auto reset_default_limit = [&](const std::string& rpc_name) { + if (rpc_with_specific_limit_.contains(rpc_name)) { + return; + } + limiters_[rpc_name]->reset_max_qps_limit(default_qps_limit); + }; + + std::unique_lock write_lock(shared_mtx_); + for (const auto& [k, _] : specific_limits) { + rpc_with_specific_limit_.insert(k); + } + if (default_qps_limit < 0) { + for_each_rpc_name(service, std::move(reset_specific_limit)); + return; + } + if (specific_limits.empty()) { + for_each_rpc_name(service, std::move(reset_default_limit)); + return; + } + for_each_rpc_name(service, [&](const std::string& rpc_name) { + if (reset_specific_limit(rpc_name)) { + return; + } + reset_default_limit(rpc_name); + }); +} + +void RateLimiter::for_each_rpc_limiter( + std::function)> cb) { + for (const auto& [rpc_name, rpc_limiter] : limiters_) { + cb(rpc_name, rpc_limiter); + } +} + bool RpcRateLimiter::get_qps_token(const std::string& instance_id, std::function& get_bvar_qps) { if (!config::use_detailed_metrics || instance_id.empty()) { @@ -93,6 +155,14 @@ bool RpcRateLimiter::get_qps_token(const std::string& instance_id, return qps_token->get_token(get_bvar_qps); } +void RpcRateLimiter::reset_max_qps_limit(int64_t max_qps_limit) { + std::lock_guard l(mutex_); + max_qps_limit_ = max_qps_limit; + for (auto& [_, v] : qps_limiter_) { + v->reset_max_qps_limit(max_qps_limit); + } +} + bool RpcRateLimiter::QpsToken::get_token(std::function& get_bvar_qps) { using namespace std::chrono; auto now = steady_clock::now(); @@ -110,4 +180,9 @@ bool RpcRateLimiter::QpsToken::get_token(std::function& get_bvar_qps) { return current_qps_ < max_qps_limit_; } -} // namespace doris::cloud \ No newline at end of file +void RpcRateLimiter::QpsToken::reset_max_qps_limit(int64_t max_qps_limit) { + std::lock_guard l(mutex_); + max_qps_limit_ = max_qps_limit; +} + +} // namespace doris::cloud diff --git a/cloud/src/rate-limiter/rate_limiter.h b/cloud/src/rate-limiter/rate_limiter.h index df441656aa45f3..7909f98188feb3 100644 --- a/cloud/src/rate-limiter/rate_limiter.h +++ b/cloud/src/rate-limiter/rate_limiter.h @@ -19,10 +19,13 @@ #include #include +#include #include #include +#include #include +#include #include #include "common/config.h" @@ -35,12 +38,22 @@ class RateLimiter { public: RateLimiter() = default; ~RateLimiter() = default; + void init(google::protobuf::Service* service); + std::shared_ptr get_rpc_rate_limiter(const std::string& rpc_name); + void reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit, + const std::string& specific_max_qps_limit); + + void for_each_rpc_limiter( + std::function)> cb); + private: // rpc_name -> RpcRateLimiter std::unordered_map> limiters_; + std::unordered_set rpc_with_specific_limit_; + std::shared_mutex shared_mtx_; }; class RpcRateLimiter { @@ -58,6 +71,12 @@ class RpcRateLimiter { */ bool get_qps_token(const std::string& instance_id, std::function& get_bvar_qps); + std::string_view rpc_name() const { return rpc_name_; } + + int64_t max_qps_limit() const { return max_qps_limit_; } + + void reset_max_qps_limit(int64_t max_qps_limit); + // Todo: Recycle outdated instance_id private: @@ -67,6 +86,8 @@ class RpcRateLimiter { bool get_token(std::function& get_bvar_qps); + void reset_max_qps_limit(int64_t max_qps_limit); + private: bthread::Mutex mutex_; std::chrono::steady_clock::time_point last_update_time_; @@ -75,7 +96,6 @@ class RpcRateLimiter { int64_t max_qps_limit_; }; -private: bthread::Mutex mutex_; // instance_id -> QpsToken std::unordered_map> qps_limiter_; @@ -83,4 +103,4 @@ class RpcRateLimiter { int64_t max_qps_limit_; }; -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index d1b8fd66943a20..37791a8f0aa742 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -1535,4 +1535,87 @@ TEST(MetaServiceHttpTest, get_obj_store_info_response_sk) { ms->get_obj_store_info(&cntl, &req1, &res1, nullptr); } +TEST(MetaServiceHttpTest, AdjustRateLimit) { + HttpContext ctx; + { + auto [status_code, content] = ctx.query( + "adjust_rate_limit", + "default_qps_limit=10000&specific_max_qps_limit=get_cluster:10000"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = + ctx.query("adjust_rate_limit", "default_qps_limit=10000"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query( + "adjust_rate_limit", "specific_max_qps_limit=get_cluster:10000"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query("adjust_rate_limit", ""); + ASSERT_EQ(status_code, 400); + std::string msg = + "default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) " + "is required as query param"; + ASSERT_TRUE(content.find(msg) != std::string::npos); + } + { + auto [status_code, content] = ctx.query("adjust_rate_limit", "key=abc"); + ASSERT_EQ(status_code, 400); + std::string msg = + "default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) " + "is required as query param"; + ASSERT_TRUE(content.find(msg) != std::string::npos); + } + { + auto [status_code, content] = + ctx.query("adjust_rate_limit", "default_qps_limit=invalid"); + ASSERT_EQ(status_code, 400); + std::string msg = "param `qps_limit` is not a legal int64 type:"; + ASSERT_TRUE(content.find(msg) != std::string::npos); + } + { + auto [status_code, content] = ctx.query( + "adjust_rate_limit", + "specific_max_qps_limit=get_cluster:10000&default_qps_limit=invalid"); + ASSERT_EQ(status_code, 400); + std::string msg = "param `qps_limit` is not a legal int64 type:"; + ASSERT_TRUE(content.find(msg) != std::string::npos); + } + { + auto [status_code, content] = ctx.query( + "adjust_rate_limit", + "specific_max_qps_limit=get_cluster:invalid&default_qps_limit=10000"); + // note: invalid so will not take effect, but return ok, by design + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query( + "adjust_rate_limit", "specific_max_qps_limit=xxx:10000&default_qps_limit=10000"); + // note: invalid so will not take effect, but return ok, by design + ASSERT_EQ(status_code, 200); + } +} + +TEST(MetaServiceHttpTest, QueryRateLimit) { + HttpContext ctx; + { + auto [status_code, content] = ctx.query("query_rate_limit", ""); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = + ctx.query("query_rate_limit", "rpc_name=get_cluster"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = ctx.query("query_rate_limit", "rpc_name=xxx"); + ASSERT_EQ(status_code, 400); + std::string msg = "rpc_name=xxx is not exists"; + ASSERT_TRUE(content.find(msg) != std::string::npos); + } +} + } // namespace doris::cloud diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp index 2a10451a69fa1b..0ff0565178532c 100644 --- a/cloud/test/rate_limiter_test.cpp +++ b/cloud/test/rate_limiter_test.cpp @@ -17,8 +17,11 @@ #include "rate-limiter/rate_limiter.h" +#include #include +#include + #include "common/config.h" #include "common/util.h" #include "meta-service/keys.h" @@ -44,8 +47,7 @@ std::unique_ptr get_meta_service() { return std::make_unique(std::move(meta_service)); } -TEST(RateLimiterTest, RateLimitGetClusterTest) { - auto meta_service = get_meta_service(); +void mock_add_cluster(MetaServiceProxy& meta_service) { // add cluster first InstanceKeyInfo key_info {mock_instance}; std::string key; @@ -63,50 +65,131 @@ TEST(RateLimiterTest, RateLimitGetClusterTest) { std::unique_ptr txn; std::string get_val; - ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(meta_service.txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->put(key, val); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + +void mock_get_cluster(MetaServiceProxy& meta_service, MetaServiceCode code) { + GetClusterRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_cluster_id(mock_cluster_id); + req.set_cluster_name(mock_cluster_name); + brpc::Controller cntl; + GetClusterResponse res; + meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, + &res, nullptr); - auto get_cluster = [&](MetaServiceCode code) { - GetClusterRequest req; - req.set_cloud_unique_id("test_cloud_unique_id"); - req.set_cluster_id(mock_cluster_id); - req.set_cluster_name(mock_cluster_name); - brpc::Controller cntl; - GetClusterResponse res; - meta_service->get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, - &res, nullptr); - - ASSERT_EQ(res.status().code(), code); - }; + ASSERT_EQ(res.status().code(), code); +} + +template +void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, MetaServiceCode expected, + size_t times) { std::vector threads; - for (int i = 0; i < 20; ++i) { - threads.emplace_back(get_cluster, MetaServiceCode::OK); + for (size_t i = 0; i < times; ++i) { + threads.emplace_back([&]() { rpc(*meta_service, expected); }); } for (auto& t : threads) { t.join(); } - threads.clear(); +} + +TEST(RateLimiterTest, RateLimitGetClusterTest) { + auto meta_service = get_meta_service(); + mock_add_cluster(*meta_service); + + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); std::this_thread::sleep_for(std::chrono::seconds(1)); meta_service->rate_limiter() ->get_rpc_rate_limiter("get_cluster") ->qps_limiter_[mock_instance] ->max_qps_limit_ = 1; - threads.emplace_back(get_cluster, MetaServiceCode::MAX_QPS_LIMIT); - for (auto& t : threads) { - t.join(); - } - threads.clear(); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::MAX_QPS_LIMIT, 1); std::this_thread::sleep_for(std::chrono::seconds(1)); meta_service->rate_limiter() ->get_rpc_rate_limiter("get_cluster") ->qps_limiter_[mock_instance] ->max_qps_limit_ = 10000; - threads.emplace_back(get_cluster, MetaServiceCode::OK); - for (auto& t : threads) { - t.join(); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 1); +} + +TEST(RateLimiterTest, AdjustSpecificLimitTest) { + auto meta_service = get_meta_service(); + mock_add_cluster(*meta_service); + + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + std::this_thread::sleep_for(std::chrono::seconds(1)); + + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1, "get_cluster:10000"); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 10000); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1, "get_cluster:1"); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::MAX_QPS_LIMIT, 1); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 10000, + "get_cluster:10000"); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 10000); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 10000, "get_cluster:1"); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::MAX_QPS_LIMIT, 1); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); } - threads.clear(); -} \ No newline at end of file +} + +TEST(RateLimiterTest, AdjustDefaultLimitTest) { + auto meta_service = get_meta_service(); + mock_add_cluster(*meta_service); + + mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1, ""); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000000); + } + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1000, + "get_cluster:5000"); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1000); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000); + } + { + meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1000, "commit_txn:5000"); + auto limit = + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 5000); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000); + } +} From 6be65b4f050340ca0ff0f1f9a4fad48f018bdfc3 Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Thu, 31 Oct 2024 20:39:03 +0800 Subject: [PATCH 2/3] refactor and support adjusting by instance id --- cloud/src/meta-service/meta_service_http.cpp | 149 ++++++++---- cloud/src/rate-limiter/rate_limiter.cpp | 96 ++++---- cloud/src/rate-limiter/rate_limiter.h | 33 ++- cloud/test/meta_service_http_test.cpp | 87 +++---- cloud/test/rate_limiter_test.cpp | 225 ++++++++++++++----- 5 files changed, 398 insertions(+), 192 deletions(-) diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index eb28b3bc5a6a7b..5c9e96269c2ebb 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -30,7 +30,10 @@ #include #include +#include +#include #include +#include #include #include #include @@ -339,62 +342,120 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { const auto& uri = cntl->http_request().uri(); - bool is_attr_set = false; - int64_t default_qps_limit = -1; - if (auto default_qps_limit_str = std::string(http_query(uri, "default_qps_limit")); - !default_qps_limit_str.empty()) { + auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; + auto rpc_name = std::string {http_query(uri, "rpc_name")}; + auto instance_id = std::string {http_query(uri, "instance_id")}; + + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + static auto parse_qps_limit = + [](const std::string& qps_limit_str) -> std::variant { + DCHECK(!qps_limit_str.empty()); + int64_t qps_limit = -1; try { - default_qps_limit = std::stoll(default_qps_limit_str); + qps_limit = std::stoll(qps_limit_str); } catch (const std::exception& ex) { return http_json_reply( MetaServiceCode::INVALID_ARGUMENT, - fmt::format("param `default_qps_limit` is not a legal int64 type:{}", - ex.what())); + fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); } - if (default_qps_limit < 0) { + if (qps_limit < 0) { return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "`default_qps_limit` should not be less than 0"); + "`qps_limit` should not be less than 0"); } - is_attr_set |= true; - } - std::string_view specific_max_qps_limit = http_query(uri, "specific_max_qps_limit"); - is_attr_set |= (!specific_max_qps_limit.empty()); - if (!is_attr_set) { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - "default_qps_limit(int64) or " - "specific_max_qps_limit(list of[rpcname:qps(int64);]) is required as query param"); - } - auto rate_limiter = service->rate_limiter(); - rate_limiter->reset_rate_limit(service, default_qps_limit, specific_max_qps_limit.data()); - return http_json_reply(MetaServiceCode::OK, "success to adjust rate limit"); + return qps_limit; + }; + + auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { + return std::visit( + [&](auto&& parse_result) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return parse_result; + } else { + if (cb(parse_result)) { + return http_json_reply(MetaServiceCode::OK, + "sucess to adjust rate limit"); + } + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("failed to adjust rate limit for qps_limit={}, " + "rpc_name={}, instance_id={}, plz ensure correct " + "rpc/instance name", + qps_limit_str, rpc_name, instance_id)); + } + }, + parse_qps_limit(qps_limit_str)); + }; + + auto set_global_qps_limit = [process_set_qps_limit, service]() { + return process_set_qps_limit([service](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit); + }); + }; + + auto set_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); + }); + }; + + auto set_instance_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); + }); + }; + + auto set_instance_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); + }); + }; + + // for 8 element in true table of params, register processor cb + std::array, 8> processors; + std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); + processors[0b001] = std::move(set_global_qps_limit); + processors[0b011] = std::move(set_rpc_qps_limit); + processors[0b101] = std::move(set_instance_qps_limit); + processors[0b111] = std::move(set_instance_rpc_qps_limit); + + uint8_t level = (0x01 & qps_limit_str.empty()) | ((0x01 & rpc_name.empty()) << 1) | + ((0x01 & instance_id.empty()) << 2); + + DCHECK_LT(level, 8); + + return processors[level](); } static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - auto rpc_name = std::string(http_query(uri, "rpc_name")); auto rate_limiter = service->rate_limiter(); rapidjson::Document d; - if (rpc_name.empty()) { - auto get_qps_limit = [&d](std::string_view rpc_name, - std::shared_ptr rpc_limiter) { - auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit()); - d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), - rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()), - d.GetAllocator()); + auto get_qps_limit = [&d](std::string_view rpc_name, + std::shared_ptr rpc_limiter) { + rapidjson::Document node; + rapidjson::Document sub; + auto get_qps_token_limit = [&](std::string_view instance_id, + std::shared_ptr qps_token) { + sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), + qps_token->max_qps_limit(), d.GetAllocator()); }; - rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); - } else { - auto rpc_limiter = rate_limiter->get_rpc_rate_limiter(rpc_name); - if (rpc_limiter == nullptr) [[unlikely]] { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("rpc_name={} is not exists", rpc_name)); - } + rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); + auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit()); - d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), - rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()), - d.GetAllocator()); - } + node.AddMember("RPC qps limit", + rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()), + d.GetAllocator()); + node.AddMember("instance specific qps limit", sub, d.GetAllocator()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); + }; + rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); + rapidjson::StringBuffer sb; rapidjson::PrettyWriter writer(sb); d.Accept(writer); @@ -684,13 +745,13 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"alter_ram_user", process_alter_ram_user}, {"alter_iam", process_alter_iam}, {"adjust_rate_limit", process_adjust_rate_limit}, - {"query_rate_limit", process_query_rate_limit}, + {"list_rate_limit", process_query_rate_limit}, {"v1/abort_txn", process_abort_txn}, {"v1/abort_tablet_job", process_abort_tablet_job}, {"v1/alter_ram_user", process_alter_ram_user}, {"v1/alter_iam", process_alter_iam}, {"v1/adjust_rate_limit", process_adjust_rate_limit}, - {"v1/query_rate_limit", process_query_rate_limit}, + {"v1/list_rate_limit", process_query_rate_limit}, }; auto* cntl = static_cast(controller); diff --git a/cloud/src/rate-limiter/rate_limiter.cpp b/cloud/src/rate-limiter/rate_limiter.cpp index b7151a1e9e8535..1d7d0a10ac89df 100644 --- a/cloud/src/rate-limiter/rate_limiter.cpp +++ b/cloud/src/rate-limiter/rate_limiter.cpp @@ -17,12 +17,15 @@ #include "rate_limiter.h" +#include #include +#include #include #include #include #include +#include #include #include "common/bvars.h" @@ -64,7 +67,7 @@ void for_each_rpc_name(google::protobuf::Service* service, Callable cb) { void RateLimiter::init(google::protobuf::Service* service) { auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit); - std::unique_lock write_lock(shared_mtx_); + std::unique_lock write_lock(mutex_); for_each_rpc_name(service, [&](const std::string& rpc_name) { auto it = rpc_name_to_specific_limit.find(rpc_name); int64_t max_qps_limit = config::default_max_qps_limit; @@ -79,7 +82,6 @@ void RateLimiter::init(google::protobuf::Service* service) { } std::shared_ptr RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) { - std::shared_lock read_lock(shared_mtx_); auto it = limiters_.find(rpc_name); if (it == limiters_.end()) { return nullptr; @@ -87,43 +89,40 @@ std::shared_ptr RateLimiter::get_rpc_rate_limiter(const std::str return it->second; } -void RateLimiter::reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit, - const std::string& specific_max_qps_limit) { - // TODO: merge specific_max_qps_limit - auto specific_limits = parse_specific_qps_limit(specific_max_qps_limit); +bool RateLimiter::set_rate_limit(int64_t qps_limit) { + std::lock_guard lock(mutex_); + auto filter = [this](const auto& kv) { return !rpc_with_specific_limit_.contains(kv.first); }; + for (const auto& [_, v] : limiters_ | std::views::filter(std::move(filter))) { + v->set_max_qps_limit(qps_limit); + } + return true; +} - auto reset_specific_limit = [&](const std::string& rpc_name) -> bool { - if (auto it = specific_limits.find(rpc_name); it != specific_limits.end()) { - limiters_[rpc_name]->reset_max_qps_limit(it->second); - return true; - } +bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name) { + if (!limiters_.contains(rpc_name)) { return false; - }; - auto reset_default_limit = [&](const std::string& rpc_name) { - if (rpc_with_specific_limit_.contains(rpc_name)) { - return; - } - limiters_[rpc_name]->reset_max_qps_limit(default_qps_limit); - }; - - std::unique_lock write_lock(shared_mtx_); - for (const auto& [k, _] : specific_limits) { - rpc_with_specific_limit_.insert(k); - } - if (default_qps_limit < 0) { - for_each_rpc_name(service, std::move(reset_specific_limit)); - return; } - if (specific_limits.empty()) { - for_each_rpc_name(service, std::move(reset_default_limit)); - return; + auto limiter = limiters_.at(rpc_name); + std::lock_guard lock(mutex_); + limiter->set_max_qps_limit(qps_limit); + rpc_with_specific_limit_.insert(rpc_name); + return true; +} + +bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name, + const std::string& instance_id) { + if (!limiters_.contains(rpc_name)) { + return false; } - for_each_rpc_name(service, [&](const std::string& rpc_name) { - if (reset_specific_limit(rpc_name)) { - return; - } - reset_default_limit(rpc_name); + auto limiter = limiters_.at(rpc_name); + return limiter->set_max_qps_limit(qps_limit, instance_id); +} + +bool RateLimiter::set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id) { + return std::ranges::all_of(limiters_, [&](const auto& kv) { + return kv.second->set_max_qps_limit(qps_limit, instance_id); }); + return true; } void RateLimiter::for_each_rpc_limiter( @@ -155,11 +154,32 @@ bool RpcRateLimiter::get_qps_token(const std::string& instance_id, return qps_token->get_token(get_bvar_qps); } -void RpcRateLimiter::reset_max_qps_limit(int64_t max_qps_limit) { +void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) { std::lock_guard l(mutex_); max_qps_limit_ = max_qps_limit; - for (auto& [_, v] : qps_limiter_) { - v->reset_max_qps_limit(max_qps_limit); + auto filter = [this](const auto& kv) { + return !instance_with_specific_limit_.contains(kv.first); + }; + for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) { + v->set_max_qps_limit(max_qps_limit); + } +} + +bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const std::string& instance_id) { + std::lock_guard l(mutex_); + if (!qps_limiter_.contains(instance_id)) { + qps_limiter_[instance_id] = std::make_shared(max_qps_limit); + } else { + qps_limiter_.at(instance_id)->set_max_qps_limit(max_qps_limit); + } + instance_with_specific_limit_.insert(instance_id); + return true; +} + +void RpcRateLimiter::for_each_qps_token( + std::function)> cb) { + for (const auto& [instance_id, qps_token] : qps_limiter_) { + cb(instance_id, qps_token); } } @@ -180,7 +200,7 @@ bool RpcRateLimiter::QpsToken::get_token(std::function& get_bvar_qps) { return current_qps_ < max_qps_limit_; } -void RpcRateLimiter::QpsToken::reset_max_qps_limit(int64_t max_qps_limit) { +void RpcRateLimiter::QpsToken::set_max_qps_limit(int64_t max_qps_limit) { std::lock_guard l(mutex_); max_qps_limit_ = max_qps_limit; } diff --git a/cloud/src/rate-limiter/rate_limiter.h b/cloud/src/rate-limiter/rate_limiter.h index 7909f98188feb3..06a862242be7f4 100644 --- a/cloud/src/rate-limiter/rate_limiter.h +++ b/cloud/src/rate-limiter/rate_limiter.h @@ -43,17 +43,27 @@ class RateLimiter { std::shared_ptr get_rpc_rate_limiter(const std::string& rpc_name); - void reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit, - const std::string& specific_max_qps_limit); - void for_each_rpc_limiter( std::function)> cb); + // set global default rate limit, will not infulence rpc and instance specific qps limit setting + bool set_rate_limit(int64_t qps_limit); + + // set rpc level rate limit, will not infulence instance specific qps limit setting + bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name); + + // set instance level rate limit for specific rpc + bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name, + const std::string& instance_id); + + // set instance level rate limit globally, will influence settings for the same instance of specific rpc + bool set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id); + private: // rpc_name -> RpcRateLimiter std::unordered_map> limiters_; std::unordered_set rpc_with_specific_limit_; - std::shared_mutex shared_mtx_; + bthread::Mutex mutex_; }; class RpcRateLimiter { @@ -75,18 +85,19 @@ class RpcRateLimiter { int64_t max_qps_limit() const { return max_qps_limit_; } - void reset_max_qps_limit(int64_t max_qps_limit); + void set_max_qps_limit(int64_t max_qps_limit); - // Todo: Recycle outdated instance_id + bool set_max_qps_limit(int64_t max_qps_limit, const std::string& instance); -private: class QpsToken { public: QpsToken(const int64_t max_qps_limit) : max_qps_limit_(max_qps_limit) {} bool get_token(std::function& get_bvar_qps); - void reset_max_qps_limit(int64_t max_qps_limit); + void set_max_qps_limit(int64_t max_qps_limit); + + int64_t max_qps_limit() const { return max_qps_limit_; } private: bthread::Mutex mutex_; @@ -96,9 +107,15 @@ class RpcRateLimiter { int64_t max_qps_limit_; }; + void for_each_qps_token(std::function)> cb); + + // Todo: Recycle outdated instance_id + +private: bthread::Mutex mutex_; // instance_id -> QpsToken std::unordered_map> qps_limiter_; + std::unordered_set instance_with_specific_limit_; std::string rpc_name_; int64_t max_qps_limit_; }; diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 37791a8f0aa742..4360efeb4422a9 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -1538,63 +1538,81 @@ TEST(MetaServiceHttpTest, get_obj_store_info_response_sk) { TEST(MetaServiceHttpTest, AdjustRateLimit) { HttpContext ctx; { - auto [status_code, content] = ctx.query( - "adjust_rate_limit", - "default_qps_limit=10000&specific_max_qps_limit=get_cluster:10000"); + auto [status_code, content] = + ctx.query("adjust_rate_limit", "qps_limit=10000"); ASSERT_EQ(status_code, 200); } { auto [status_code, content] = - ctx.query("adjust_rate_limit", "default_qps_limit=10000"); + ctx.query("adjust_rate_limit", "qps_limit=10000&rpc_name=get_cluster"); ASSERT_EQ(status_code, 200); } { auto [status_code, content] = ctx.query( - "adjust_rate_limit", "specific_max_qps_limit=get_cluster:10000"); + "adjust_rate_limit", + "qps_limit=10000&rpc_name=get_cluster&instance_id=test_instance"); ASSERT_EQ(status_code, 200); } { - auto [status_code, content] = ctx.query("adjust_rate_limit", ""); + auto [status_code, content] = ctx.query( + "adjust_rate_limit", "qps_limit=10000&instance_id=test_instance"); + ASSERT_EQ(status_code, 200); + } + { + auto [status_code, content] = + ctx.query("adjust_rate_limit", "qps_limit=invalid"); ASSERT_EQ(status_code, 400); - std::string msg = - "default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) " - "is required as query param"; - ASSERT_TRUE(content.find(msg) != std::string::npos); + std::string msg = "param `qps_limit` is not a legal int64 type:"; + ASSERT_NE(content.find(msg), std::string::npos); } { - auto [status_code, content] = ctx.query("adjust_rate_limit", "key=abc"); + auto [status_code, content] = ctx.query("adjust_rate_limit", "qps_limit=-1"); ASSERT_EQ(status_code, 400); - std::string msg = - "default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) " - "is required as query param"; - ASSERT_TRUE(content.find(msg) != std::string::npos); + std::string msg = "qps_limit` should not be less than 0"; + ASSERT_NE(content.find(msg), std::string::npos); } { auto [status_code, content] = - ctx.query("adjust_rate_limit", "default_qps_limit=invalid"); + ctx.query("adjust_rate_limit", "rpc_name=get_cluster"); ASSERT_EQ(status_code, 400); - std::string msg = "param `qps_limit` is not a legal int64 type:"; - ASSERT_TRUE(content.find(msg) != std::string::npos); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); } { - auto [status_code, content] = ctx.query( - "adjust_rate_limit", - "specific_max_qps_limit=get_cluster:10000&default_qps_limit=invalid"); + auto [status_code, content] = + ctx.query("adjust_rate_limit", "instance_id=test_instance"); ASSERT_EQ(status_code, 400); - std::string msg = "param `qps_limit` is not a legal int64 type:"; - ASSERT_TRUE(content.find(msg) != std::string::npos); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); } { auto [status_code, content] = ctx.query( - "adjust_rate_limit", - "specific_max_qps_limit=get_cluster:invalid&default_qps_limit=10000"); - // note: invalid so will not take effect, but return ok, by design + "adjust_rate_limit", "rpc_name=get_cluster&instance_id=test_instance"); + ASSERT_EQ(status_code, 400); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = ctx.query("adjust_rate_limit", ""); + ASSERT_EQ(status_code, 400); + std::string msg = "invalid argument:"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = + ctx.query("adjust_rate_limit", "qps_limit=1000&rpc_name=invalid"); + ASSERT_EQ(status_code, 400); + std::string msg = "failed to adjust rate limit for qps_limit"; + ASSERT_NE(content.find(msg), std::string::npos); + } + { + auto [status_code, content] = + ctx.query("adjust_rate_limit", "qps_limit=1000&instance_id=invalid"); ASSERT_EQ(status_code, 200); } { auto [status_code, content] = ctx.query( - "adjust_rate_limit", "specific_max_qps_limit=xxx:10000&default_qps_limit=10000"); - // note: invalid so will not take effect, but return ok, by design + "adjust_rate_limit", "qps_limit=1000&rpc_name=get_cluster&instance_id=invalid"); ASSERT_EQ(status_code, 200); } } @@ -1602,20 +1620,9 @@ TEST(MetaServiceHttpTest, AdjustRateLimit) { TEST(MetaServiceHttpTest, QueryRateLimit) { HttpContext ctx; { - auto [status_code, content] = ctx.query("query_rate_limit", ""); - ASSERT_EQ(status_code, 200); - } - { - auto [status_code, content] = - ctx.query("query_rate_limit", "rpc_name=get_cluster"); + auto [status_code, content] = ctx.query("list_rate_limit", ""); ASSERT_EQ(status_code, 200); } - { - auto [status_code, content] = ctx.query("query_rate_limit", "rpc_name=xxx"); - ASSERT_EQ(status_code, 400); - std::string msg = "rpc_name=xxx is not exists"; - ASSERT_TRUE(content.find(msg) != std::string::npos); - } } } // namespace doris::cloud diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp index 0ff0565178532c..cab7b01774c247 100644 --- a/cloud/test/rate_limiter_test.cpp +++ b/cloud/test/rate_limiter_test.cpp @@ -29,6 +29,7 @@ #include "meta-service/meta_service.h" #include "meta-service/txn_kv_error.h" #include "mock_resource_manager.h" +#include "resource-manager/resource_manager.h" int main(int argc, char** argv) { doris::cloud::config::init(nullptr, true); @@ -38,24 +39,50 @@ int main(int argc, char** argv) { using namespace doris::cloud; +const std::string mock_instance_0 = "mock_instance_0"; +const std::string mock_instance_1 = "mock_instance_1"; +const std::string mock_cluster_0 = "mock_cluster_0"; +const std::string mock_cluster_1 = "mock_cluster_1"; +const std::string mock_cluster_id_0 = "mock_cluster_id_0"; +const std::string mock_cluster_id_1 = "mock_cluster_id_1"; +const std::string mock_cloud_unique_id_0 = "mock_cloud_unique_id_0"; +const std::string mock_cloud_unique_id_1 = "mock_cloud_unique_id_1"; + +class MockMultiInstanceRsMgr : public MockResourceManager { +public: + using MockResourceManager::MockResourceManager; + + std::string get_node(const std::string& cloud_unique_id, + std::vector* nodes) override { + if (cloud_unique_id == mock_cloud_unique_id_0) { + nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_0, mock_cluster_0, + mock_cluster_id_0); + } else if (cloud_unique_id == mock_cloud_unique_id_1) { + nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_1, mock_cluster_1, + mock_cluster_id_1); + } + return {}; + }; +}; + std::unique_ptr get_meta_service() { auto txn_kv = std::dynamic_pointer_cast(std::make_shared()); [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); - auto rs = std::make_shared(txn_kv); + auto rs = std::make_shared(txn_kv); auto rl = std::make_shared(); auto meta_service = std::make_unique(txn_kv, rs, rl); return std::make_unique(std::move(meta_service)); } -void mock_add_cluster(MetaServiceProxy& meta_service) { +void mock_add_cluster(MetaServiceProxy& meta_service, std::string instance_id) { // add cluster first - InstanceKeyInfo key_info {mock_instance}; + InstanceKeyInfo key_info {instance_id}; std::string key; std::string val; instance_key(key_info, &key); InstanceInfoPB instance; - instance.set_instance_id(mock_instance); + instance.set_instance_id(instance_id); ClusterPB c1; c1.set_cluster_name(mock_cluster_name); c1.set_cluster_id(mock_cluster_id); @@ -70,9 +97,10 @@ void mock_add_cluster(MetaServiceProxy& meta_service) { ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } -void mock_get_cluster(MetaServiceProxy& meta_service, MetaServiceCode code) { +void mock_get_cluster(MetaServiceProxy& meta_service, const std::string& cloud_uid, + MetaServiceCode code) { GetClusterRequest req; - req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_cloud_unique_id(cloud_uid); req.set_cluster_id(mock_cluster_id); req.set_cluster_name(mock_cluster_name); brpc::Controller cntl; @@ -84,11 +112,11 @@ void mock_get_cluster(MetaServiceProxy& meta_service, MetaServiceCode code) { } template -void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, MetaServiceCode expected, - size_t times) { +void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const std::string& cloud_uid, + MetaServiceCode expected, size_t times) { std::vector threads; for (size_t i = 0; i < times; ++i) { - threads.emplace_back([&]() { rpc(*meta_service, expected); }); + threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected); }); } for (auto& t : threads) { t.join(); @@ -97,99 +125,172 @@ void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, MetaServiceCode TEST(RateLimiterTest, RateLimitGetClusterTest) { auto meta_service = get_meta_service(); - mock_add_cluster(*meta_service); + mock_add_cluster(*meta_service, mock_instance_0); - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); std::this_thread::sleep_for(std::chrono::seconds(1)); meta_service->rate_limiter() ->get_rpc_rate_limiter("get_cluster") - ->qps_limiter_[mock_instance] + ->qps_limiter_[mock_instance_0] ->max_qps_limit_ = 1; - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::MAX_QPS_LIMIT, 1); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::MAX_QPS_LIMIT, 1); std::this_thread::sleep_for(std::chrono::seconds(1)); meta_service->rate_limiter() ->get_rpc_rate_limiter("get_cluster") - ->qps_limiter_[mock_instance] + ->qps_limiter_[mock_instance_0] ->max_qps_limit_ = 10000; - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 1); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 1); } -TEST(RateLimiterTest, AdjustSpecificLimitTest) { +TEST(RateLimiterTest, AdjustLimitInfluenceTest) { auto meta_service = get_meta_service(); - mock_add_cluster(*meta_service); + mock_add_cluster(*meta_service, mock_instance_0); + mock_add_cluster(*meta_service, mock_instance_1); - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 1); std::this_thread::sleep_for(std::chrono::seconds(1)); + { + ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, mock_instance_1)); + ASSERT_TRUE( + meta_service->rate_limiter()->set_rate_limit(100, "get_cluster", mock_instance_0)); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 100); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 1); + } { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1, "get_cluster:10000"); - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1)); auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); - ASSERT_EQ(limit, 10000); - std::this_thread::sleep_for(std::chrono::seconds(1)); + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000000); } - { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1, "get_cluster:1"); - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::MAX_QPS_LIMIT, 1); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(5000, "get_cluster")); auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); ASSERT_EQ(limit, 1); - std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1000)); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 1000); } - { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 10000, - "get_cluster:10000"); - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 20); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(3000, "commit_txn")); auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); - ASSERT_EQ(limit, 10000); - std::this_thread::sleep_for(std::chrono::seconds(1)); + meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); + ASSERT_EQ(limit, 3000); + limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 5000); } - { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 10000, "get_cluster:1"); - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::MAX_QPS_LIMIT, 1); - auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 100); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); ASSERT_EQ(limit, 1); - std::this_thread::sleep_for(std::chrono::seconds(1)); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(200, mock_instance_1)); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 100); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 200); } } -TEST(RateLimiterTest, AdjustDefaultLimitTest) { +TEST(RateLimiterTest, AdjustLimitMockRPCTest) { auto meta_service = get_meta_service(); - mock_add_cluster(*meta_service); - - mock_parallel_rpc(mock_get_cluster, meta_service.get(), MetaServiceCode::OK, 1); + mock_add_cluster(*meta_service, mock_instance_0); + mock_add_cluster(*meta_service, mock_instance_1); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1, + MetaServiceCode::OK, 20); std::this_thread::sleep_for(std::chrono::seconds(1)); + { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1, ""); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster")); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); - ASSERT_EQ(limit, 1); - limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); - ASSERT_EQ(limit, 5000000); + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 10000); + std::this_thread::sleep_for(std::chrono::seconds(1)); } { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1000, - "get_cluster:5000"); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1, "get_cluster")); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::MAX_QPS_LIMIT, 1); auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); - ASSERT_EQ(limit, 1000); - limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); - ASSERT_EQ(limit, 5000); + meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); + ASSERT_EQ(limit, 1); + std::this_thread::sleep_for(std::chrono::seconds(1)); } { - meta_service->rate_limiter()->reset_rate_limit(meta_service.get(), 1000, "commit_txn:5000"); - auto limit = - meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit(); - ASSERT_EQ(limit, 5000); - limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit(); - ASSERT_EQ(limit, 5000); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster", + mock_instance_0)); + ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster", + mock_instance_1)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::OK, 20); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1, + MetaServiceCode::OK, 20); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 10000); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 10000); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + { + ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, mock_instance_0)); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0, + MetaServiceCode::MAX_QPS_LIMIT, 1); + mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1, + MetaServiceCode::OK, 20); + auto limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_0) + ->max_qps_limit(); + ASSERT_EQ(limit, 1); + limit = meta_service->rate_limiter() + ->get_rpc_rate_limiter("get_cluster") + ->qps_limiter_.at(mock_instance_1) + ->max_qps_limit(); + ASSERT_EQ(limit, 10000); } } From 70f960777a90f033e15fef5f7de9c6062e072eee Mon Sep 17 00:00:00 2001 From: Siyang Tang Date: Tue, 12 Nov 2024 20:35:28 +0800 Subject: [PATCH 3/3] fix reviewd problems --- cloud/src/meta-service/meta_service_http.cpp | 52 +++++++------------- cloud/src/rate-limiter/rate_limiter.h | 41 +++++++++++++-- 2 files changed, 56 insertions(+), 37 deletions(-) diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 5c9e96269c2ebb..2f7536e9989a6c 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -346,15 +347,7 @@ static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Co auto rpc_name = std::string {http_query(uri, "rpc_name")}; auto instance_id = std::string {http_query(uri, "instance_id")}; - auto process_invalid_arguments = [&]() -> HttpResponse { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("invalid argument: qps_limit(required)={}, " - "rpc_name(optional)={}, instance_id(optional)={}", - qps_limit_str, rpc_name, instance_id)); - }; - - static auto parse_qps_limit = - [](const std::string& qps_limit_str) -> std::variant { + auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { DCHECK(!qps_limit_str.empty()); int64_t qps_limit = -1; try { @@ -368,29 +361,14 @@ static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Co return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "`qps_limit` should not be less than 0"); } - return qps_limit; - }; - - auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { - return std::visit( - [&](auto&& parse_result) { - using T = std::decay_t; - if constexpr (std::is_same_v) { - return parse_result; - } else { - if (cb(parse_result)) { - return http_json_reply(MetaServiceCode::OK, - "sucess to adjust rate limit"); - } - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - fmt::format("failed to adjust rate limit for qps_limit={}, " - "rpc_name={}, instance_id={}, plz ensure correct " - "rpc/instance name", - qps_limit_str, rpc_name, instance_id)); - } - }, - parse_qps_limit(qps_limit_str)); + if (cb(qps_limit)) { + return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); + } + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("failed to adjust rate limit for qps_limit={}, " + "rpc_name={}, instance_id={}, plz ensure correct " + "rpc/instance name", + qps_limit_str, rpc_name, instance_id)); }; auto set_global_qps_limit = [process_set_qps_limit, service]() { @@ -417,7 +395,15 @@ static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Co }); }; - // for 8 element in true table of params, register processor cb + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + // We have 3 optional params and 2^3 combination, and 4 of them are illegal. + // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. std::array, 8> processors; std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); processors[0b001] = std::move(set_global_qps_limit); diff --git a/cloud/src/rate-limiter/rate_limiter.h b/cloud/src/rate-limiter/rate_limiter.h index 06a862242be7f4..e557d0d8a10798 100644 --- a/cloud/src/rate-limiter/rate_limiter.h +++ b/cloud/src/rate-limiter/rate_limiter.h @@ -43,25 +43,47 @@ class RateLimiter { std::shared_ptr get_rpc_rate_limiter(const std::string& rpc_name); + /** + * @brief for each rpc limiter, apply callback + * + * @param cb callback function with params rpc name and rate limiter + */ void for_each_rpc_limiter( std::function)> cb); - // set global default rate limit, will not infulence rpc and instance specific qps limit setting + /** + * @brief set global default rate limit, will not infulence rpc and instance specific qps limit setting + * + * @return true if set sucessfully + */ bool set_rate_limit(int64_t qps_limit); - // set rpc level rate limit, will not infulence instance specific qps limit setting + /** + * @brief set rpc level rate limit, will not infulence instance specific qps limit setting + * + * @return true if set sucessfully + */ bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name); - // set instance level rate limit for specific rpc + /** + * @brief set instance level rate limit for specific rpc + * + * @return true if set sucessfully + */ bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name, const std::string& instance_id); - // set instance level rate limit globally, will influence settings for the same instance of specific rpc + /** + * @brief set instance level rate limit globally, will influence settings for the same instance of specific rpc + * + * @return true if set sucessfully + */ bool set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id); private: // rpc_name -> RpcRateLimiter std::unordered_map> limiters_; + // rpc names which specific limit have been set std::unordered_set rpc_with_specific_limit_; bthread::Mutex mutex_; }; @@ -85,8 +107,18 @@ class RpcRateLimiter { int64_t max_qps_limit() const { return max_qps_limit_; } + /** + * @brief set max qps limit for this limiter + * + * @return true if set sucessfully + */ void set_max_qps_limit(int64_t max_qps_limit); + /** + * @brief set max qps limit for specific instance within this limiter + * + * @return true if set sucessfully + */ bool set_max_qps_limit(int64_t max_qps_limit, const std::string& instance); class QpsToken { @@ -115,6 +147,7 @@ class RpcRateLimiter { bthread::Mutex mutex_; // instance_id -> QpsToken std::unordered_map> qps_limiter_; + // instance ids which specific limit have been set std::unordered_set instance_with_specific_limit_; std::string rpc_name_; int64_t max_qps_limit_;