diff --git a/configs/envoy_service_to_service.template.json b/configs/envoy_service_to_service.template.json index f9bacca90c114..88afe787397cc 100644 --- a/configs/envoy_service_to_service.template.json +++ b/configs/envoy_service_to_service.template.json @@ -131,7 +131,10 @@ "filters": [ {"type": "decoder", "name": "rate_limit", "config": { - "domain": "envoy_service_to_service" + "domain": "envoy_service_to_service", + "actions": [ + {"type": "service_to_service"} + ] } }, {"type": "both", "name": "grpc_http1_bridge", "config": {}}, diff --git a/docs/configuration/http_filters/rate_limit_filter.rst b/docs/configuration/http_filters/rate_limit_filter.rst index 7df74499b77bd..c8fe975a36df7 100644 --- a/docs/configuration/http_filters/rate_limit_filter.rst +++ b/docs/configuration/http_filters/rate_limit_filter.rst @@ -7,17 +7,10 @@ Global rate limiting :ref:`architecture overview `. The HTTP rate limit filter will call the rate limit service when the request's route has the *global* property set in the :ref:`rate limit configuration -`. If the rate limit service is called, the -following descriptors are sent: +`. - * ("to_cluster", "<:ref:`route target cluster `>") - * ("to_cluster", "<:ref:`route target cluster `>"), - ("from_cluster", "") - - is derived from the :option:`--service-cluster` option. - -If the rate limit service is called, and the response for either of the above descriptors is over -limit, a 429 response is returned. +If the rate limit service is called, and the response for any of the descriptors is over limit, a +429 response is returned. .. code-block:: json @@ -25,13 +18,48 @@ limit, a 429 response is returned. "type": "decoder", "name": "rate_limit", "config": { - "domain": "..." + "domain": "...", + "actions": [] } } domain *(required, string)* The rate limit domain to use when calling the rate limit service. +actions + *(required, array)* An array of rate limiting actions to perform. Multiple actions can be + specified. The supported action types are documented below. + +Actions +------- + +.. code-block:: json + + { + "type": "..." + } + +type + *(required, string) The type of rate limit action to perform. The currently supported action + type is *service_to_service*. + +Service to service +^^^^^^^^^^^^^^^^^^ + +.. code-block:: json + + { + "type": "service_to_service" + } + +The following descriptors are sent: + + * ("to_cluster", "<:ref:`route target cluster `>") + * ("to_cluster", "<:ref:`route target cluster `>"), + ("from_cluster", "") + + is derived from the :option:`--service-cluster` option. + Statistics ---------- diff --git a/source/common/http/filter/ratelimit.cc b/source/common/http/filter/ratelimit.cc index d50e2b7968f74..b4f42c4774cbb 100644 --- a/source/common/http/filter/ratelimit.cc +++ b/source/common/http/filter/ratelimit.cc @@ -9,39 +9,58 @@ #include "common/http/headers.h" namespace Http { +namespace RateLimit { -const Http::HeaderMapImpl RateLimitFilter::TOO_MANY_REQUESTS_HEADER{ +const Http::HeaderMapImpl Filter::TOO_MANY_REQUESTS_HEADER{ {Http::Headers::get().Status, std::to_string(enumToInt(Code::TooManyRequests))}}; -RateLimitFilterConfig::RateLimitFilterConfig(const Json::Object& config, - const std::string& local_service_cluster, - Stats::Store& stats_store, Runtime::Loader& runtime) +void ServiceToServiceAction::populateDescriptors(const Router::RouteEntry& route, + std::vector<::RateLimit::Descriptor>& descriptors, + FilterConfig& config) { + // We limit on 2 dimensions. + // 1) All calls to the given cluster. + // 2) Calls to the given cluster and from this cluster. + // The service side configuration can choose to limit on 1 or both of the above. + descriptors.push_back({{{"to_cluster", route.clusterName()}}}); + descriptors.push_back( + {{{"to_cluster", route.clusterName()}, {"from_cluster", config.localServiceCluster()}}}); +} + +FilterConfig::FilterConfig(const Json::Object& config, const std::string& local_service_cluster, + Stats::Store& stats_store, Runtime::Loader& runtime) : domain_(config.getString("domain")), local_service_cluster_(local_service_cluster), - stats_store_(stats_store), runtime_(runtime) {} + stats_store_(stats_store), runtime_(runtime) { + for (const Json::Object& action : config.getObjectArray("actions")) { + std::string type = action.getString("type"); + if (type == "service_to_service") { + actions_.emplace_back(new ServiceToServiceAction()); + } else { + throw EnvoyException(fmt::format("unknown http rate limit filter action '{}'", type)); + } + } +} -FilterHeadersStatus RateLimitFilter::decodeHeaders(HeaderMap& headers, bool) { +FilterHeadersStatus Filter::decodeHeaders(HeaderMap& headers, bool) { if (!config_->runtime().snapshot().featureEnabled("ratelimit.http_filter_enabled", 100)) { return FilterHeadersStatus::Continue; } const Router::RouteEntry* route = callbacks_->routeTable().routeForRequest(headers); if (route && route->rateLimitPolicy().doGlobalLimiting()) { - cluster_stat_prefix_ = fmt::format("cluster.{}.", route->clusterName()); - cluster_ratelimit_stat_prefix_ = fmt::format("{}ratelimit.", cluster_stat_prefix_); - - // We limit on 2 dimensions. - // 1) All calls to the given cluster. - // 2) Calls to the given cluster and from this cluster. - // The service side configuration can choose to limit on 1 or both of the above. - // NOTE: In the future we might add more things such as the path of the request. - std::vector descriptors = { - {{{"to_cluster", route->clusterName()}}}, - {{{"to_cluster", route->clusterName()}, {"from_cluster", config_->localServiceCluster()}}}}; - - state_ = State::Calling; - initiating_call_ = true; - client_->limit(*this, config_->domain(), descriptors); - initiating_call_ = false; + std::vector<::RateLimit::Descriptor> descriptors; + for (const ActionPtr& action : config_->actions()) { + action->populateDescriptors(*route, descriptors, *config_); + } + + if (!descriptors.empty()) { + cluster_stat_prefix_ = fmt::format("cluster.{}.", route->clusterName()); + cluster_ratelimit_stat_prefix_ = fmt::format("{}ratelimit.", cluster_stat_prefix_); + + state_ = State::Calling; + initiating_call_ = true; + client_->limit(*this, config_->domain(), descriptors); + initiating_call_ = false; + } } return (state_ == State::Calling || state_ == State::Responded) @@ -49,19 +68,19 @@ FilterHeadersStatus RateLimitFilter::decodeHeaders(HeaderMap& headers, bool) { : FilterHeadersStatus::Continue; } -FilterDataStatus RateLimitFilter::decodeData(Buffer::Instance&, bool) { +FilterDataStatus Filter::decodeData(Buffer::Instance&, bool) { ASSERT(state_ != State::Responded); return state_ == State::Calling ? FilterDataStatus::StopIterationAndBuffer : FilterDataStatus::Continue; } -FilterTrailersStatus RateLimitFilter::decodeTrailers(HeaderMap&) { +FilterTrailersStatus Filter::decodeTrailers(HeaderMap&) { ASSERT(state_ != State::Responded); return state_ == State::Calling ? FilterTrailersStatus::StopIteration : FilterTrailersStatus::Continue; } -void RateLimitFilter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) { +void Filter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) { callbacks_ = &callbacks; callbacks.addResetStreamCallback([this]() -> void { if (state_ == State::Calling) { @@ -70,17 +89,17 @@ void RateLimitFilter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& ca }); } -void RateLimitFilter::complete(RateLimit::LimitStatus status) { +void Filter::complete(::RateLimit::LimitStatus status) { state_ = State::Complete; switch (status) { - case RateLimit::LimitStatus::OK: + case ::RateLimit::LimitStatus::OK: config_->stats().counter(cluster_ratelimit_stat_prefix_ + "ok").inc(); break; - case RateLimit::LimitStatus::Error: + case ::RateLimit::LimitStatus::Error: config_->stats().counter(cluster_ratelimit_stat_prefix_ + "error").inc(); break; - case RateLimit::LimitStatus::OverLimit: + case ::RateLimit::LimitStatus::OverLimit: config_->stats().counter(cluster_ratelimit_stat_prefix_ + "over_limit").inc(); Http::CodeUtility::ResponseStatInfo info{config_->stats(), cluster_stat_prefix_, TOO_MANY_REQUESTS_HEADER, true, EMPTY_STRING, @@ -89,7 +108,7 @@ void RateLimitFilter::complete(RateLimit::LimitStatus status) { break; } - if (status == RateLimit::LimitStatus::OverLimit && + if (status == ::RateLimit::LimitStatus::OverLimit && config_->runtime().snapshot().featureEnabled("ratelimit.http_filter_enforcing", 100)) { state_ = State::Responded; Http::HeaderMapPtr response_headers{new HeaderMapImpl(TOO_MANY_REQUESTS_HEADER)}; @@ -99,4 +118,5 @@ void RateLimitFilter::complete(RateLimit::LimitStatus status) { } } +} // RateLimit } // Http diff --git a/source/common/http/filter/ratelimit.h b/source/common/http/filter/ratelimit.h index 66a0d8d1023db..5ae78cea4e6eb 100644 --- a/source/common/http/filter/ratelimit.h +++ b/source/common/http/filter/ratelimit.h @@ -8,15 +8,50 @@ #include "common/json/json_loader.h" namespace Http { +namespace RateLimit { + +class FilterConfig; + +/** + * Generic rate limit action that the filter performs. + */ +class Action { +public: + virtual ~Action() {} + + /** + * Potentially populate the descriptor array with new descriptors to query. + * @param route supplies the target route for the request. + * @param descriptors supplies the descriptor array to optionally fill. + * @param config supplies the filter configuration. + */ + virtual void populateDescriptors(const Router::RouteEntry& route, + std::vector<::RateLimit::Descriptor>& descriptors, + FilterConfig& config) PURE; +}; + +typedef std::unique_ptr ActionPtr; + +/** + * Action for service to service rate limiting. + */ +class ServiceToServiceAction : public Action { +public: + // Action + void populateDescriptors(const Router::RouteEntry& route, + std::vector<::RateLimit::Descriptor>& descriptors, + FilterConfig& config) override; +}; /** * Global configuration for the HTTP rate limit filter. */ -class RateLimitFilterConfig { +class FilterConfig { public: - RateLimitFilterConfig(const Json::Object& config, const std::string& local_service_cluster, - Stats::Store& stats_store, Runtime::Loader& runtime); + FilterConfig(const Json::Object& config, const std::string& local_service_cluster, + Stats::Store& stats_store, Runtime::Loader& runtime); + const std::vector& actions() { return actions_; } const std::string& domain() { return domain_; } const std::string& localServiceCluster() { return local_service_cluster_; } Runtime::Loader& runtime() { return runtime_; } @@ -27,17 +62,18 @@ class RateLimitFilterConfig { const std::string local_service_cluster_; Stats::Store& stats_store_; Runtime::Loader& runtime_; + std::vector actions_; }; -typedef std::shared_ptr RateLimitFilterConfigPtr; +typedef std::shared_ptr FilterConfigPtr; /** * HTTP rate limit filter. Depending on the route configuration, this filter calls the global * rate limiting service before allowing further filter iteration. */ -class RateLimitFilter : public StreamDecoderFilter, public RateLimit::RequestCallbacks { +class Filter : public StreamDecoderFilter, public ::RateLimit::RequestCallbacks { public: - RateLimitFilter(RateLimitFilterConfigPtr config, RateLimit::ClientPtr&& client) + Filter(FilterConfigPtr config, ::RateLimit::ClientPtr&& client) : config_(config), client_(std::move(client)) {} // Http::StreamDecoderFilter @@ -47,15 +83,15 @@ class RateLimitFilter : public StreamDecoderFilter, public RateLimit::RequestCal void setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) override; // RateLimit::RequestCallbacks - void complete(RateLimit::LimitStatus status) override; + void complete(::RateLimit::LimitStatus status) override; private: enum class State { NotStarted, Calling, Complete, Responded }; static const Http::HeaderMapImpl TOO_MANY_REQUESTS_HEADER; - RateLimitFilterConfigPtr config_; - RateLimit::ClientPtr client_; + FilterConfigPtr config_; + ::RateLimit::ClientPtr client_; StreamDecoderFilterCallbacks* callbacks_{}; bool initiating_call_{}; State state_{State::NotStarted}; @@ -63,4 +99,5 @@ class RateLimitFilter : public StreamDecoderFilter, public RateLimit::RequestCal std::string cluster_stat_prefix_; }; +} // RateLimit } // Http diff --git a/source/server/config/http/ratelimit.cc b/source/server/config/http/ratelimit.cc index b860ff4b111a4..edfd76f42bd28 100644 --- a/source/server/config/http/ratelimit.cc +++ b/source/server/config/http/ratelimit.cc @@ -18,10 +18,10 @@ class RateLimitFilterConfig : public HttpFilterConfigFactory { return nullptr; } - Http::RateLimitFilterConfigPtr filter_config(new Http::RateLimitFilterConfig( + Http::RateLimit::FilterConfigPtr filter_config(new Http::RateLimit::FilterConfig( config, server.options().serviceClusterName(), server.stats(), server.runtime())); return [filter_config, &server](Http::FilterChainFactoryCallbacks& callbacks) -> void { - callbacks.addStreamDecoderFilter(Http::StreamDecoderFilterPtr{new Http::RateLimitFilter( + callbacks.addStreamDecoderFilter(Http::StreamDecoderFilterPtr{new Http::RateLimit::Filter( filter_config, server.rateLimitClient(std::chrono::milliseconds(20)))}); }; } diff --git a/test/common/http/filter/ratelimit_test.cc b/test/common/http/filter/ratelimit_test.cc index 2e558cebfc060..41f16803e1667 100644 --- a/test/common/http/filter/ratelimit_test.cc +++ b/test/common/http/filter/ratelimit_test.cc @@ -14,13 +14,33 @@ using testing::Return; using testing::WithArgs; namespace Http { +namespace RateLimit { + +TEST(HttpRateLimitFilterBadConfigTest, All) { + std::string json = R"EOF( + { + "domain": "foo", + "actions": [ + {"type": "foo"} + ] + } + )EOF"; + + Json::StringLoader config(json); + Stats::IsolatedStoreImpl stats_store; + NiceMock runtime; + EXPECT_THROW(FilterConfig(config, "service_cluster", stats_store, runtime), EnvoyException); +} class HttpRateLimitFilterTest : public testing::Test { public: HttpRateLimitFilterTest() { std::string json = R"EOF( { - "domain": "foo" + "domain": "foo", + "actions": [ + {"type": "service_to_service"} + ] } )EOF"; @@ -30,18 +50,18 @@ class HttpRateLimitFilterTest : public testing::Test { .WillByDefault(Return(true)); Json::StringLoader config(json); - config_.reset(new RateLimitFilterConfig(config, "service_cluster", stats_store_, runtime_)); + config_.reset(new FilterConfig(config, "service_cluster", stats_store_, runtime_)); - client_ = new RateLimit::MockClient(); - filter_.reset(new RateLimitFilter(config_, RateLimit::ClientPtr{client_})); + client_ = new ::RateLimit::MockClient(); + filter_.reset(new Filter(config_, ::RateLimit::ClientPtr{client_})); filter_->setDecoderFilterCallbacks(filter_callbacks_); } - RateLimitFilterConfigPtr config_; - RateLimit::MockClient* client_; - std::unique_ptr filter_; + FilterConfigPtr config_; + ::RateLimit::MockClient* client_; + std::unique_ptr filter_; NiceMock filter_callbacks_; - RateLimit::RequestCallbacks* request_callbacks_{}; + ::RateLimit::RequestCallbacks* request_callbacks_{}; HeaderMapImpl request_headers_; Buffer::OwnedImpl data_; Stats::IsolatedStoreImpl stats_store_; @@ -77,10 +97,10 @@ TEST_F(HttpRateLimitFilterTest, OkResponse) { EXPECT_CALL(*client_, limit(_, "foo", - testing::ContainerEq(std::vector{ + testing::ContainerEq(std::vector<::RateLimit::Descriptor>{ {{{"to_cluster", "fake_cluster"}}}, {{{"to_cluster", "fake_cluster"}, {"from_cluster", "service_cluster"}}}}))) - .WillOnce(WithArgs<0>(Invoke([&](RateLimit::RequestCallbacks& callbacks) + .WillOnce(WithArgs<0>(Invoke([&](::RateLimit::RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; }))); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); @@ -88,7 +108,7 @@ TEST_F(HttpRateLimitFilterTest, OkResponse) { EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_headers_)); EXPECT_CALL(filter_callbacks_, continueDecoding()); - request_callbacks_->complete(RateLimit::LimitStatus::OK); + request_callbacks_->complete(::RateLimit::LimitStatus::OK); EXPECT_EQ(1U, stats_store_.counter("cluster.fake_cluster.ratelimit.ok").value()); } @@ -100,11 +120,11 @@ TEST_F(HttpRateLimitFilterTest, ImmediateOkResponse) { EXPECT_CALL(*client_, limit(_, "foo", - testing::ContainerEq(std::vector{ + testing::ContainerEq(std::vector<::RateLimit::Descriptor>{ {{{"to_cluster", "fake_cluster"}}}, {{{"to_cluster", "fake_cluster"}, {"from_cluster", "service_cluster"}}}}))) - .WillOnce(WithArgs<0>(Invoke([&](RateLimit::RequestCallbacks& callbacks) -> void { - callbacks.complete(RateLimit::LimitStatus::OK); + .WillOnce(WithArgs<0>(Invoke([&](::RateLimit::RequestCallbacks& callbacks) -> void { + callbacks.complete(::RateLimit::LimitStatus::OK); }))); EXPECT_CALL(filter_callbacks_, continueDecoding()).Times(0); @@ -121,13 +141,13 @@ TEST_F(HttpRateLimitFilterTest, ErrorResponse) { filter_callbacks_.route_table_.route_entry_.rate_limit_policy_.do_global_limiting_ = true; EXPECT_CALL(*client_, limit(_, _, _)) - .WillOnce(WithArgs<0>(Invoke([&](RateLimit::RequestCallbacks& callbacks) + .WillOnce(WithArgs<0>(Invoke([&](::RateLimit::RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; }))); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); EXPECT_CALL(filter_callbacks_, continueDecoding()); - request_callbacks_->complete(RateLimit::LimitStatus::Error); + request_callbacks_->complete(::RateLimit::LimitStatus::Error); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, false)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_headers_)); @@ -141,7 +161,7 @@ TEST_F(HttpRateLimitFilterTest, LimitResponse) { filter_callbacks_.route_table_.route_entry_.rate_limit_policy_.do_global_limiting_ = true; EXPECT_CALL(*client_, limit(_, _, _)) - .WillOnce(WithArgs<0>(Invoke([&](RateLimit::RequestCallbacks& callbacks) + .WillOnce(WithArgs<0>(Invoke([&](::RateLimit::RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; }))); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); @@ -149,7 +169,7 @@ TEST_F(HttpRateLimitFilterTest, LimitResponse) { Http::HeaderMapImpl response_headers{{":status", "429"}}; EXPECT_CALL(filter_callbacks_, encodeHeaders_(HeaderMapEqualRef(response_headers), true)); EXPECT_CALL(filter_callbacks_, continueDecoding()).Times(0); - request_callbacks_->complete(RateLimit::LimitStatus::OverLimit); + request_callbacks_->complete(::RateLimit::LimitStatus::OverLimit); EXPECT_EQ(1U, stats_store_.counter("cluster.fake_cluster.ratelimit.over_limit").value()); EXPECT_EQ(1U, stats_store_.counter("cluster.fake_cluster.upstream_rq_4xx").value()); @@ -162,7 +182,7 @@ TEST_F(HttpRateLimitFilterTest, LimitResponseRuntimeDisabled) { filter_callbacks_.route_table_.route_entry_.rate_limit_policy_.do_global_limiting_ = true; EXPECT_CALL(*client_, limit(_, _, _)) - .WillOnce(WithArgs<0>(Invoke([&](RateLimit::RequestCallbacks& callbacks) + .WillOnce(WithArgs<0>(Invoke([&](::RateLimit::RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; }))); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); @@ -170,7 +190,7 @@ TEST_F(HttpRateLimitFilterTest, LimitResponseRuntimeDisabled) { EXPECT_CALL(runtime_.snapshot_, featureEnabled("ratelimit.http_filter_enforcing", 100)) .WillOnce(Return(false)); EXPECT_CALL(filter_callbacks_, continueDecoding()); - request_callbacks_->complete(RateLimit::LimitStatus::OverLimit); + request_callbacks_->complete(::RateLimit::LimitStatus::OverLimit); EXPECT_EQ(FilterDataStatus::Continue, filter_->decodeData(data_, false)); EXPECT_EQ(FilterTrailersStatus::Continue, filter_->decodeTrailers(request_headers_)); @@ -186,7 +206,7 @@ TEST_F(HttpRateLimitFilterTest, ResetDuringCall) { filter_callbacks_.route_table_.route_entry_.rate_limit_policy_.do_global_limiting_ = true; EXPECT_CALL(*client_, limit(_, _, _)) - .WillOnce(WithArgs<0>(Invoke([&](RateLimit::RequestCallbacks& callbacks) + .WillOnce(WithArgs<0>(Invoke([&](::RateLimit::RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; }))); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); @@ -195,4 +215,5 @@ TEST_F(HttpRateLimitFilterTest, ResetDuringCall) { filter_callbacks_.reset_callback_(); } +} // RateLimit } // Http diff --git a/test/config/integration/server.json b/test/config/integration/server.json index fc908e6ef05ca..1501e6a0c9c6d 100644 --- a/test/config/integration/server.json +++ b/test/config/integration/server.json @@ -80,7 +80,14 @@ "pass_through_mode": false, "endpoint": "/healthcheck" } }, - { "type": "decoder", "name": "rate_limit", "config": { "domain": "foo" } }, + { "type": "decoder", "name": "rate_limit", + "config": { + "domain": "foo", + "actions": [ + {"type": "service_to_service"} + ] + } + }, { "type": "decoder", "name": "router", "config": {} } ] } @@ -153,7 +160,14 @@ "pass_through_mode": false, "endpoint": "/healthcheck" } }, - { "type": "decoder", "name": "rate_limit", "config": { "domain": "foo" } }, + { "type": "decoder", "name": "rate_limit", + "config": { + "domain": "foo", + "actions": [ + {"type": "service_to_service"} + ] + } + }, { "type": "decoder", "name": "buffer", "config": { "max_request_bytes": 5242880,