diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index c6341c8cd8ddf..bce06b055a049 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -41,9 +41,7 @@ EXTENSIONS = { # "envoy.filters.http.adaptive_concurrency": "//source/extensions/filters/http/adaptive_concurrency:config", - # NOTE: The admission control filter does not have a proper filter - # implemented right now. We are just referencing the filter lib here. - "envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:admission_control_filter_lib", + "envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:config", "envoy.filters.http.aws_lambda": "//source/extensions/filters/http/aws_lambda:config", "envoy.filters.http.aws_request_signing": "//source/extensions/filters/http/aws_request_signing:config", "envoy.filters.http.buffer": "//source/extensions/filters/http/buffer:config", diff --git a/source/extensions/filters/http/admission_control/BUILD b/source/extensions/filters/http/admission_control/BUILD index cb4a9975b09b6..2bfdfb9912a69 100644 --- a/source/extensions/filters/http/admission_control/BUILD +++ b/source/extensions/filters/http/admission_control/BUILD @@ -15,6 +15,7 @@ envoy_cc_extension( name = "admission_control_filter_lib", srcs = [ "admission_control.cc", + "thread_local_controller.cc", ], hdrs = [ "admission_control.h", @@ -33,3 +34,20 @@ envoy_cc_extension( "@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto", ], ) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + security_posture = "unknown", + status = "alpha", + deps = [ + "//include/envoy/registry", + "//source/common/common:enum_to_int", + "//source/extensions/filters/http:well_known_names", + "//source/extensions/filters/http/admission_control:admission_control_filter_lib", + "//source/extensions/filters/http/admission_control/evaluators:response_evaluator_lib", + "//source/extensions/filters/http/common:factory_base_lib", + "@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/admission_control/admission_control.cc b/source/extensions/filters/http/admission_control/admission_control.cc index 7953b79c36f1c..8886d73596f19 100644 --- a/source/extensions/filters/http/admission_control/admission_control.cc +++ b/source/extensions/filters/http/admission_control/admission_control.cc @@ -31,7 +31,7 @@ using GrpcStatus = Grpc::Status::GrpcStatus; static constexpr double defaultAggression = 2.0; AdmissionControlFilterConfig::AdmissionControlFilterConfig( - const AdmissionControlProto& proto_config, Runtime::Loader& runtime, TimeSource&, + const AdmissionControlProto& proto_config, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Stats::Scope& scope, ThreadLocal::SlotPtr&& tls, std::shared_ptr response_evaluator) : random_(random), scope_(scope), tls_(std::move(tls)), @@ -122,8 +122,9 @@ AdmissionControlFilter::encodeTrailers(Http::ResponseTrailerMap& trailers) { } bool AdmissionControlFilter::shouldRejectRequest() const { - const double total = config_->getController().requestTotalCount(); - const double success = config_->getController().requestSuccessCount(); + const auto request_counts = config_->getController().requestCounts(); + const double total = request_counts.requests; + const double success = request_counts.successes; const double probability = (total - config_->aggression() * success) / (total + 1); // Choosing an accuracy of 4 significant figures for the probability. diff --git a/source/extensions/filters/http/admission_control/admission_control.h b/source/extensions/filters/http/admission_control/admission_control.h index 22edcf5393961..a962096ae8cea 100644 --- a/source/extensions/filters/http/admission_control/admission_control.h +++ b/source/extensions/filters/http/admission_control/admission_control.h @@ -49,12 +49,14 @@ using AdmissionControlProto = class AdmissionControlFilterConfig { public: AdmissionControlFilterConfig(const AdmissionControlProto& proto_config, Runtime::Loader& runtime, - TimeSource&, Runtime::RandomGenerator& random, Stats::Scope& scope, + Runtime::RandomGenerator& random, Stats::Scope& scope, ThreadLocal::SlotPtr&& tls, std::shared_ptr response_evaluator); virtual ~AdmissionControlFilterConfig() = default; - virtual ThreadLocalController& getController() const { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + virtual ThreadLocalController& getController() const { + return tls_->getTyped(); + } Runtime::RandomGenerator& random() const { return random_; } bool filterEnabled() const { return admission_control_feature_.enabled(); } diff --git a/source/extensions/filters/http/admission_control/config.cc b/source/extensions/filters/http/admission_control/config.cc new file mode 100644 index 0000000000000..297fabf4f6d71 --- /dev/null +++ b/source/extensions/filters/http/admission_control/config.cc @@ -0,0 +1,64 @@ +#include "extensions/filters/http/admission_control/config.h" + +#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.h" +#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.validate.h" +#include "envoy/registry/registry.h" + +#include "common/common/enum_to_int.h" + +#include "extensions/filters/http/admission_control/admission_control.h" +#include "extensions/filters/http/admission_control/evaluators/response_evaluator.h" +#include "extensions/filters/http/admission_control/evaluators/success_criteria_evaluator.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdmissionControl { + +static constexpr std::chrono::seconds defaultSamplingWindow{120}; + +Http::FilterFactoryCb AdmissionControlFilterFactory::createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::admission_control::v3alpha::AdmissionControl& config, + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) { + + const std::string prefix = stats_prefix + "admission_control."; + + // Create the thread-local controller. + auto tls = context.threadLocal().allocateSlot(); + auto sampling_window = std::chrono::seconds( + PROTOBUF_GET_MS_OR_DEFAULT(config, sampling_window, 1000 * defaultSamplingWindow.count()) / + 1000); + tls->set( + [sampling_window, &context](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(context.timeSource(), sampling_window); + }); + + std::unique_ptr response_evaluator; + switch (config.evaluation_criteria_case()) { + case AdmissionControlProto::EvaluationCriteriaCase::kSuccessCriteria: + response_evaluator = std::make_unique(config.success_criteria()); + break; + case AdmissionControlProto::EvaluationCriteriaCase::EVALUATION_CRITERIA_NOT_SET: + NOT_REACHED_GCOVR_EXCL_LINE; + } + + AdmissionControlFilterConfigSharedPtr filter_config = + std::make_shared(config, context.runtime(), context.random(), + context.scope(), std::move(tls), + std::move(response_evaluator)); + + return [filter_config, prefix](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared(filter_config, prefix)); + }; +} + +/** + * Static registration for the admission_control filter. @see RegisterFactory. + */ +REGISTER_FACTORY(AdmissionControlFilterFactory, + Server::Configuration::NamedHttpFilterConfigFactory); + +} // namespace AdmissionControl +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/admission_control/config.h b/source/extensions/filters/http/admission_control/config.h new file mode 100644 index 0000000000000..8abe84eafefcb --- /dev/null +++ b/source/extensions/filters/http/admission_control/config.h @@ -0,0 +1,32 @@ +#pragma once + +#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.h" +#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.validate.h" + +#include "extensions/filters/http/common/factory_base.h" +#include "extensions/filters/http/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdmissionControl { + +/** + * Config registration for the adaptive concurrency limit filter. @see NamedHttpFilterConfigFactory. + */ +class AdmissionControlFilterFactory + : public Common::FactoryBase< + envoy::extensions::filters::http::admission_control::v3alpha::AdmissionControl> { +public: + AdmissionControlFilterFactory() : FactoryBase(HttpFilterNames::get().AdmissionControl) {} + + Http::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::http::admission_control::v3alpha::AdmissionControl& + proto_config, + const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override; +}; + +} // namespace AdmissionControl +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/admission_control/thread_local_controller.cc b/source/extensions/filters/http/admission_control/thread_local_controller.cc new file mode 100644 index 0000000000000..30f0aac40061c --- /dev/null +++ b/source/extensions/filters/http/admission_control/thread_local_controller.cc @@ -0,0 +1,49 @@ +#include "extensions/filters/http/admission_control/thread_local_controller.h" + +#include + +#include "envoy/common/pure.h" +#include "envoy/common/time.h" +#include "envoy/http/codes.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdmissionControl { + +static constexpr std::chrono::seconds defaultHistoryGranularity{1}; + +ThreadLocalControllerImpl::ThreadLocalControllerImpl(TimeSource& time_source, + std::chrono::seconds sampling_window) + : time_source_(time_source), sampling_window_(sampling_window) {} + +void ThreadLocalControllerImpl::maybeUpdateHistoricalData() { + // Purge stale samples. + while (!historical_data_.empty() && ageOfOldestSample() >= sampling_window_) { + removeOldestSample(); + } + + // It's possible we purged stale samples from the history and are left with nothing, so it's + // necessary to add an empty entry. We will also need to roll over into a new entry in the + // historical data if we've exceeded the time specified by the granularity. + if (historical_data_.empty() || ageOfNewestSample() >= defaultHistoryGranularity) { + historical_data_.emplace_back(time_source_.monotonicTime(), RequestData()); + } +} + +void ThreadLocalControllerImpl::recordRequest(bool success) { + maybeUpdateHistoricalData(); + + // The back of the deque will be the most recent samples. + ++historical_data_.back().second.requests; + ++global_data_.requests; + if (success) { + ++historical_data_.back().second.successes; + ++global_data_.successes; + } +} + +} // namespace AdmissionControl +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/admission_control/thread_local_controller.h b/source/extensions/filters/http/admission_control/thread_local_controller.h index 9b5096b805696..11f9387581779 100644 --- a/source/extensions/filters/http/admission_control/thread_local_controller.h +++ b/source/extensions/filters/http/admission_control/thread_local_controller.h @@ -15,6 +15,19 @@ namespace AdmissionControl { */ class ThreadLocalController { public: + struct RequestData { + RequestData(uint32_t request_count, uint32_t success_count) + : requests(request_count), successes(success_count) {} + RequestData() = default; + + inline bool operator==(const RequestData& rhs) const { + return (requests == rhs.requests) && (successes == rhs.successes); + } + + uint32_t requests{0}; + uint32_t successes{0}; + }; + virtual ~ThreadLocalController() = default; // Record success/failure of a request and update the internal state of the controller to reflect @@ -22,11 +35,76 @@ class ThreadLocalController { virtual void recordSuccess() PURE; virtual void recordFailure() PURE; - // Returns the current number of recorded requests. - virtual uint32_t requestTotalCount() PURE; + // Returns the current number of requests and how many of them are successful. + virtual RequestData requestCounts() PURE; +}; + +/** + * Thread-local object to track request counts and successes over a rolling time window. Request + * data for the time window is kept recent via a circular buffer that phases out old request/success + * counts when recording new samples. + * + * This controller is thread-local so that we do not need to take any locks on the sample histories + * to update them, at the cost of decreasing the number of samples. + * + * The look-back window for request samples is accurate up to a hard-coded 1-second granularity. + * TODO (tonya11en): Allow the granularity to be configurable. + */ +class ThreadLocalControllerImpl : public ThreadLocalController, + public ThreadLocal::ThreadLocalObject { +public: + ThreadLocalControllerImpl(TimeSource& time_source, std::chrono::seconds sampling_window); + ~ThreadLocalControllerImpl() override = default; + void recordSuccess() override { recordRequest(true); } + void recordFailure() override { recordRequest(false); } + + RequestData requestCounts() override { + maybeUpdateHistoricalData(); + return global_data_; + } + +private: + void recordRequest(bool success); + + // Potentially remove any stale samples and record sample aggregates to the historical data. + void maybeUpdateHistoricalData(); + + // Returns the age of the oldest sample in the historical data. + std::chrono::microseconds ageOfOldestSample() const { + ASSERT(!historical_data_.empty()); + using namespace std::chrono; + return duration_cast(time_source_.monotonicTime() - + historical_data_.front().first); + } + + // Returns the age of the newest sample in the historical data. + std::chrono::microseconds ageOfNewestSample() const { + ASSERT(!historical_data_.empty()); + using namespace std::chrono; + return duration_cast(time_source_.monotonicTime() - + historical_data_.back().first); + } + + // Removes the oldest sample in the historical data and reconciles the global data. + void removeOldestSample() { + ASSERT(!historical_data_.empty()); + global_data_.successes -= historical_data_.front().second.successes; + global_data_.requests -= historical_data_.front().second.requests; + historical_data_.pop_front(); + } + + TimeSource& time_source_; + + // Stores samples from oldest (front) to newest (back). Since there is no need to read/modify + // entries that are not the oldest or newest (front/back), we can get away with using a deque + // which allocates memory in chunks and keeps most elements contiguous and cache-friendly. + std::deque> historical_data_; + + // Request data aggregated for the whole look-back window. + RequestData global_data_; - // Returns the current number of recorded request successes. - virtual uint32_t requestSuccessCount() PURE; + // The rolling time window size. + const std::chrono::seconds sampling_window_; }; } // namespace AdmissionControl diff --git a/test/extensions/filters/http/admission_control/BUILD b/test/extensions/filters/http/admission_control/BUILD index b161f26e16a14..1b9595276119a 100644 --- a/test/extensions/filters/http/admission_control/BUILD +++ b/test/extensions/filters/http/admission_control/BUILD @@ -55,3 +55,27 @@ envoy_extension_cc_test( "@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto", ], ) + +envoy_extension_cc_test( + name = "admission_control_integration_test", + srcs = ["admission_control_integration_test.cc"], + extension_name = "envoy.filters.http.admission_control", + deps = [ + "//source/extensions/filters/http/admission_control:config", + "//test/integration:http_integration_lib", + "//test/test_common:utility_lib", + ], +) + +envoy_extension_cc_test( + name = "admission_controller_test", + srcs = ["controller_test.cc"], + extension_name = "envoy.filters.http.admission_control", + deps = [ + "//source/common/http:headers_lib", + "//source/extensions/filters/http/admission_control:admission_control_filter_lib", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto", + ], +) diff --git a/test/extensions/filters/http/admission_control/admission_control_filter_test.cc b/test/extensions/filters/http/admission_control/admission_control_filter_test.cc index ad1c3ca28543c..0b188f78c1397 100644 --- a/test/extensions/filters/http/admission_control/admission_control_filter_test.cc +++ b/test/extensions/filters/http/admission_control/admission_control_filter_test.cc @@ -29,11 +29,12 @@ namespace HttpFilters { namespace AdmissionControl { namespace { +using RequestData = ThreadLocalController::RequestData; + class MockThreadLocalController : public ThreadLocal::ThreadLocalObject, public ThreadLocalController { public: - MOCK_METHOD(uint32_t, requestTotalCount, ()); - MOCK_METHOD(uint32_t, requestSuccessCount, ()); + MOCK_METHOD(RequestData, requestCounts, ()); MOCK_METHOD(void, recordSuccess, ()); MOCK_METHOD(void, recordFailure, ()); }; @@ -47,11 +48,10 @@ class MockResponseEvaluator : public ResponseEvaluator { class TestConfig : public AdmissionControlFilterConfig { public: TestConfig(const AdmissionControlProto& proto_config, Runtime::Loader& runtime, - TimeSource& time_source, Runtime::RandomGenerator& random, Stats::Scope& scope, - ThreadLocal::SlotPtr&& tls, MockThreadLocalController& controller, - std::shared_ptr evaluator) - : AdmissionControlFilterConfig(proto_config, runtime, time_source, random, scope, - std::move(tls), std::move(evaluator)), + Runtime::RandomGenerator& random, Stats::Scope& scope, ThreadLocal::SlotPtr&& tls, + MockThreadLocalController& controller, std::shared_ptr evaluator) + : AdmissionControlFilterConfig(proto_config, runtime, random, scope, std::move(tls), + std::move(evaluator)), controller_(controller) {} ThreadLocalController& getController() const override { return controller_; } @@ -69,8 +69,8 @@ class AdmissionControlTest : public testing::Test { auto tls = context_.threadLocal().allocateSlot(); evaluator_ = std::make_shared(); - return std::make_shared(proto, runtime_, time_system_, random_, scope_, - std::move(tls), controller_, evaluator_); + return std::make_shared(proto, runtime_, random_, scope_, std::move(tls), + controller_, evaluator_); } void setupFilter(std::shared_ptr config) { @@ -145,8 +145,7 @@ sampling_window: 10s EXPECT_CALL(runtime_.snapshot_, getBoolean("foo.enabled", true)).WillRepeatedly(Return(false)); // The filter is bypassed via runtime. - EXPECT_CALL(controller_, requestTotalCount()).Times(0); - EXPECT_CALL(controller_, requestSuccessCount()).Times(0); + EXPECT_CALL(controller_, requestCounts()).Times(0); // We expect no rejections. Http::TestRequestHeaderMapImpl request_headers; @@ -164,8 +163,7 @@ TEST_F(AdmissionControlTest, DisregardHealthChecks) { // We do not make admission decisions for health checks, so we expect no lookup of request success // counts. - EXPECT_CALL(controller_, requestTotalCount()).Times(0); - EXPECT_CALL(controller_, requestSuccessCount()).Times(0); + EXPECT_CALL(controller_, requestCounts()).Times(0); Http::TestRequestHeaderMapImpl request_headers; Http::TestResponseHeaderMapImpl response_headers{{":status", "200"}}; @@ -181,8 +179,7 @@ TEST_F(AdmissionControlTest, HttpFailureBehavior) { // We expect rejection counter to increment upon failure. TestUtility::waitForCounterEq(scope_, "test_prefix.rq_rejected", 0, time_system_); - EXPECT_CALL(controller_, requestTotalCount()).WillRepeatedly(Return(100)); - EXPECT_CALL(controller_, requestSuccessCount()).WillRepeatedly(Return(0)); + EXPECT_CALL(controller_, requestCounts()).WillRepeatedly(Return(RequestData(100, 0))); EXPECT_CALL(*evaluator_, isHttpSuccess(500)).WillRepeatedly(Return(false)); Http::TestRequestHeaderMapImpl request_headers; @@ -201,8 +198,7 @@ TEST_F(AdmissionControlTest, HttpSuccessBehavior) { // We expect rejection counter to NOT increment upon success. TestUtility::waitForCounterEq(scope_, "test_prefix.rq_rejected", 0, time_system_); - EXPECT_CALL(controller_, requestTotalCount()).WillRepeatedly(Return(100)); - EXPECT_CALL(controller_, requestSuccessCount()).WillRepeatedly(Return(100)); + EXPECT_CALL(controller_, requestCounts()).WillRepeatedly(Return(RequestData(100, 100))); EXPECT_CALL(*evaluator_, isHttpSuccess(200)).WillRepeatedly(Return(true)); Http::TestRequestHeaderMapImpl request_headers; @@ -219,8 +215,7 @@ TEST_F(AdmissionControlTest, GrpcFailureBehavior) { TestUtility::waitForCounterEq(scope_, "test_prefix.rq_rejected", 0, time_system_); - EXPECT_CALL(controller_, requestTotalCount()).WillRepeatedly(Return(100)); - EXPECT_CALL(controller_, requestSuccessCount()).WillRepeatedly(Return(0)); + EXPECT_CALL(controller_, requestCounts()).WillRepeatedly(Return(RequestData(100, 0))); EXPECT_CALL(*evaluator_, isGrpcSuccess(7)).WillRepeatedly(Return(false)); Http::TestRequestHeaderMapImpl request_headers; @@ -239,8 +234,7 @@ TEST_F(AdmissionControlTest, GrpcSuccessBehaviorTrailer) { TestUtility::waitForCounterEq(scope_, "test_prefix.rq_rejected", 0, time_system_); - EXPECT_CALL(controller_, requestTotalCount()).WillRepeatedly(Return(100)); - EXPECT_CALL(controller_, requestSuccessCount()).WillRepeatedly(Return(100)); + EXPECT_CALL(controller_, requestCounts()).WillRepeatedly(Return(RequestData(100, 100))); EXPECT_CALL(*evaluator_, isGrpcSuccess(0)).WillRepeatedly(Return(true)); Http::TestRequestHeaderMapImpl request_headers; @@ -258,8 +252,7 @@ TEST_F(AdmissionControlTest, GrpcFailureBehaviorTrailer) { TestUtility::waitForCounterEq(scope_, "test_prefix.rq_rejected", 0, time_system_); - EXPECT_CALL(controller_, requestTotalCount()).WillRepeatedly(Return(100)); - EXPECT_CALL(controller_, requestSuccessCount()).WillRepeatedly(Return(0)); + EXPECT_CALL(controller_, requestCounts()).WillRepeatedly(Return(RequestData(100, 0))); EXPECT_CALL(*evaluator_, isGrpcSuccess(7)).WillRepeatedly(Return(false)); Http::TestRequestHeaderMapImpl request_headers; @@ -278,8 +271,7 @@ TEST_F(AdmissionControlTest, GrpcSuccessBehavior) { TestUtility::waitForCounterEq(scope_, "test_prefix.rq_rejected", 0, time_system_); - EXPECT_CALL(controller_, requestTotalCount()).WillRepeatedly(Return(100)); - EXPECT_CALL(controller_, requestSuccessCount()).WillRepeatedly(Return(100)); + EXPECT_CALL(controller_, requestCounts()).WillRepeatedly(Return(RequestData(100, 100))); EXPECT_CALL(*evaluator_, isGrpcSuccess(0)).WillRepeatedly(Return(true)); Http::TestRequestHeaderMapImpl request_headers; diff --git a/test/extensions/filters/http/admission_control/admission_control_integration_test.cc b/test/extensions/filters/http/admission_control/admission_control_integration_test.cc new file mode 100644 index 0000000000000..a361cbff4d092 --- /dev/null +++ b/test/extensions/filters/http/admission_control/admission_control_integration_test.cc @@ -0,0 +1,171 @@ +#include "common/grpc/common.h" + +#include "test/integration/autonomous_upstream.h" +#include "test/integration/http_integration.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +namespace Envoy { +namespace { + +const std::string ADMISSION_CONTROL_CONFIG = + R"EOF( +name: envoy.filters.http.admission_control +typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.admission_control.v3alpha.AdmissionControl + success_criteria: + http_criteria: + grpc_criteria: + sampling_window: 120s + aggression_coefficient: + default_value: 1.0 + runtime_key: "foo.aggression" + enabled: + default_value: true + runtime_key: "foo.enabled" +)EOF"; + +class AdmissionControlIntegrationTest : public Event::TestUsingSimulatedTime, + public testing::TestWithParam, + public HttpIntegrationTest { +public: + AdmissionControlIntegrationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, GetParam(), realTime()) {} + + void SetUp() override {} + + void initialize() override { + config_helper_.addConfigModifier(setEnableDownstreamTrailersHttp1()); + config_helper_.addFilter(ADMISSION_CONTROL_CONFIG); + HttpIntegrationTest::initialize(); + } + +protected: + void verifyGrpcSuccess(IntegrationStreamDecoderPtr response) { + EXPECT_EQ("0", response->trailers()->GrpcStatus()->value().getStringView()); + } + + void verifyHttpSuccess(IntegrationStreamDecoderPtr response) { + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + } + + IntegrationStreamDecoderPtr sendGrpcRequestWithReturnCode(uint64_t code) { + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Set the response headers on the autonomous upstream. + auto headers = std::make_unique(); + headers->setStatus(200); + headers->setContentType("application/grpc"); + + auto trailers = std::make_unique(); + trailers->setGrpcMessage("this is a message"); + trailers->setGrpcStatus(code); + + auto* au = reinterpret_cast(fake_upstreams_.front().get()); + au->setResponseHeaders(std::move(headers)); + au->setResponseTrailers(std::move(trailers)); + + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + response->waitForEndStream(); + codec_client_->close(); + return response; + } + + IntegrationStreamDecoderPtr sendRequestWithReturnCode(std::string&& code) { + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Set the response headers on the autonomous upstream. + auto* au = reinterpret_cast(fake_upstreams_.front().get()); + au->setResponseHeaders(std::make_unique( + Http::TestResponseHeaderMapImpl({{":status", code}}))); + + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + response->waitForEndStream(); + codec_client_->close(); + return response; + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, AdmissionControlIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +TEST_P(AdmissionControlIntegrationTest, HttpTest) { + autonomous_upstream_ = true; + initialize(); + + // Drop the success rate to a very low value. + ENVOY_LOG(info, "dropping success rate"); + for (int i = 0; i < 1000; ++i) { + sendRequestWithReturnCode("500"); + } + + // Measure throttling rate from the admission control filter. + double throttle_count = 0; + double request_count = 0; + ENVOY_LOG(info, "validating throttling rate"); + for (int i = 0; i < 1000; ++i) { + auto response = sendRequestWithReturnCode("500"); + auto rc = response->headers().Status()->value().getStringView(); + if (rc == "503") { + ++throttle_count; + } else { + ASSERT_EQ(rc, "500"); + } + ++request_count; + } + + // Given the current throttling rate formula with an aggression of 1, it should result in a ~98% + // throttling rate. Allowing an error of 3%. + EXPECT_NEAR(throttle_count / request_count, 0.98, 0.03); + + // We now wait for the history to become stale. + timeSystem().advanceTimeWait(std::chrono::seconds(120)); + + // We expect a 100% success rate after waiting. No throttling should occur. + for (int i = 0; i < 100; ++i) { + verifyHttpSuccess(sendRequestWithReturnCode("200")); + } +} + +TEST_P(AdmissionControlIntegrationTest, GrpcTest) { + autonomous_upstream_ = true; + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); + initialize(); + + // Drop the success rate to a very low value. + for (int i = 0; i < 1000; ++i) { + sendGrpcRequestWithReturnCode(14); + } + + // Measure throttling rate from the admission control filter. + double throttle_count = 0; + double request_count = 0; + for (int i = 0; i < 1000; ++i) { + auto response = sendGrpcRequestWithReturnCode(10); + + // When the filter is throttling, it returns an HTTP code 503 and the GRPC status is unset. + // Otherwise, we expect a GRPC status of "Unknown" as set above. + if (response->headers().Status()->value().getStringView() == "503") { + ++throttle_count; + } else { + auto grpc_status = Grpc::Common::getGrpcStatus(*(response->trailers())); + ASSERT_EQ(grpc_status, Grpc::Status::WellKnownGrpcStatus::Aborted); + } + ++request_count; + } + + // Given the current throttling rate formula with an aggression of 1, it should result in a ~98% + // throttling rate. Allowing an error of 3%. + EXPECT_NEAR(throttle_count / request_count, 0.98, 0.03); + + // We now wait for the history to become stale. + timeSystem().advanceTimeWait(std::chrono::seconds(120)); + + // We expect a 100% success rate after waiting. No throttling should occur. + for (int i = 0; i < 100; ++i) { + verifyGrpcSuccess(sendGrpcRequestWithReturnCode(0)); + } +} + +} // namespace +} // namespace Envoy diff --git a/test/extensions/filters/http/admission_control/config_test.cc b/test/extensions/filters/http/admission_control/config_test.cc index 2201b3c36cb11..aa716a054e1e9 100644 --- a/test/extensions/filters/http/admission_control/config_test.cc +++ b/test/extensions/filters/http/admission_control/config_test.cc @@ -35,15 +35,14 @@ class AdmissionControlConfigTest : public testing::Test { TestUtility::loadFromYamlAndValidate(yaml, proto); auto tls = context_.threadLocal().allocateSlot(); auto evaluator = std::make_unique(proto.success_criteria()); - return std::make_shared( - proto, runtime_, time_system_, random_, scope_, std::move(tls), std::move(evaluator)); + return std::make_shared(proto, runtime_, random_, scope_, + std::move(tls), std::move(evaluator)); } protected: NiceMock runtime_; NiceMock context_; Stats::IsolatedStoreImpl scope_; - Event::SimulatedTimeSystem time_system_; NiceMock random_; }; diff --git a/test/extensions/filters/http/admission_control/controller_test.cc b/test/extensions/filters/http/admission_control/controller_test.cc new file mode 100644 index 0000000000000..bf88a7037431d --- /dev/null +++ b/test/extensions/filters/http/admission_control/controller_test.cc @@ -0,0 +1,107 @@ +#include + +#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.h" +#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.validate.h" + +#include "extensions/filters/http/admission_control/thread_local_controller.h" + +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdmissionControl { +namespace { + +using RequestData = ThreadLocalController::RequestData; + +class ThreadLocalControllerTest : public testing::Test { +public: + ThreadLocalControllerTest() : window_(5), tlc_(time_system_, window_) {} + +protected: + // Submit a single request per entry in the historical data (this comes out to a single request + // each second). The final sample does not advance time to allow for testing of this transition. + void fillHistorySlots(const bool successes = true) { + std::function record; + if (successes) { + record = [this]() { tlc_.recordSuccess(); }; + } else { + record = [this]() { tlc_.recordFailure(); }; + } + for (int tick = 0; tick < window_.count(); ++tick) { + record(); + time_system_.advanceTimeWait(std::chrono::seconds(1)); + } + // Don't sleep after the final sample to allow for measurements. + record(); + } + + Event::SimulatedTimeSystem time_system_; + std::chrono::seconds window_; + ThreadLocalControllerImpl tlc_; +}; + +// Test the basic functionality of the admission controller. +TEST_F(ThreadLocalControllerTest, BasicRecord) { + EXPECT_EQ(RequestData(0, 0), tlc_.requestCounts()); + + tlc_.recordFailure(); + EXPECT_EQ(RequestData(1, 0), tlc_.requestCounts()); + + tlc_.recordSuccess(); + EXPECT_EQ(RequestData(2, 1), tlc_.requestCounts()); +} + +// Verify that stale historical samples are removed when they grow stale. +TEST_F(ThreadLocalControllerTest, RemoveStaleSamples) { + fillHistorySlots(); + + // We expect a single request counted in each second of the window. + EXPECT_EQ(RequestData(window_.count(), window_.count()), tlc_.requestCounts()); + + time_system_.advanceTimeWait(std::chrono::seconds(1)); + + // Continuing to sample requests at 1 per second should maintain the same request counts. We'll + // record failures here. + fillHistorySlots(false); + EXPECT_EQ(RequestData(window_.count(), 0), tlc_.requestCounts()); + + // Expect the oldest entry to go stale. + time_system_.advanceTimeWait(std::chrono::seconds(1)); + EXPECT_EQ(RequestData(window_.count() - 1, 0), tlc_.requestCounts()); +} + +// Verify that stale historical samples are removed when they grow stale. +TEST_F(ThreadLocalControllerTest, RemoveStaleSamples2) { + fillHistorySlots(); + + // We expect a single request counted in each second of the window. + EXPECT_EQ(RequestData(window_.count(), window_.count()), tlc_.requestCounts()); + + // Let's just sit here for a full day. We expect all samples to become stale. + time_system_.advanceTimeWait(std::chrono::hours(24)); + + EXPECT_EQ(RequestData(0, 0), tlc_.requestCounts()); +} + +// Verify that historical samples are made only when there is data to record. +TEST_F(ThreadLocalControllerTest, VerifyMemoryUsage) { + // Make sure we don't add any null data to the history if there are sparse requests. + tlc_.recordSuccess(); + time_system_.advanceTimeWait(std::chrono::seconds(1)); + tlc_.recordSuccess(); + time_system_.advanceTimeWait(std::chrono::seconds(3)); + tlc_.recordSuccess(); + EXPECT_EQ(RequestData(3, 3), tlc_.requestCounts()); +} + +} // namespace +} // namespace AdmissionControl +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/integration/autonomous_upstream.cc b/test/integration/autonomous_upstream.cc index 14cf58a0cfd51..45a467dcf7e76 100644 --- a/test/integration/autonomous_upstream.cc +++ b/test/integration/autonomous_upstream.cc @@ -60,7 +60,8 @@ void AutonomousStream::sendResponse() { HeaderToInt(RESPONSE_SIZE_BYTES, response_body_length, headers); encodeHeaders(upstream_.responseHeaders(), false); - encodeData(response_body_length, true); + encodeData(response_body_length, false); + encodeTrailers(upstream_.responseTrailers()); } AutonomousHttpConnection::AutonomousHttpConnection(AutonomousUpstream& autonomous_upstream, @@ -111,12 +112,24 @@ std::unique_ptr AutonomousUpstream::lastRequestH return std::move(last_request_headers_); } +void AutonomousUpstream::setResponseTrailers( + std::unique_ptr&& response_trailers) { + Thread::LockGuard lock(headers_lock_); + response_trailers_ = std::move(response_trailers); +} + void AutonomousUpstream::setResponseHeaders( std::unique_ptr&& response_headers) { Thread::LockGuard lock(headers_lock_); response_headers_ = std::move(response_headers); } +Http::TestResponseTrailerMapImpl AutonomousUpstream::responseTrailers() { + Thread::LockGuard lock(headers_lock_); + Http::TestResponseTrailerMapImpl return_trailers = *response_trailers_; + return return_trailers; +} + Http::TestResponseHeaderMapImpl AutonomousUpstream::responseHeaders() { Thread::LockGuard lock(headers_lock_); Http::TestResponseHeaderMapImpl return_headers = *response_headers_; diff --git a/test/integration/autonomous_upstream.h b/test/integration/autonomous_upstream.h index c188344a9ec72..e9d247a4ba952 100644 --- a/test/integration/autonomous_upstream.h +++ b/test/integration/autonomous_upstream.h @@ -56,6 +56,7 @@ class AutonomousUpstream : public FakeUpstream { bool allow_incomplete_streams) : FakeUpstream(address, type, time_system), allow_incomplete_streams_(allow_incomplete_streams), + response_trailers_(std::make_unique()), response_headers_(std::make_unique( Http::TestResponseHeaderMapImpl({{":status", "200"}}))) {} @@ -64,6 +65,7 @@ class AutonomousUpstream : public FakeUpstream { Event::TestTimeSystem& time_system, bool allow_incomplete_streams) : FakeUpstream(std::move(transport_socket_factory), port, type, version, time_system), allow_incomplete_streams_(allow_incomplete_streams), + response_trailers_(std::make_unique()), response_headers_(std::make_unique( Http::TestResponseHeaderMapImpl({{":status", "200"}}))) {} @@ -77,13 +79,16 @@ class AutonomousUpstream : public FakeUpstream { void setLastRequestHeaders(const Http::HeaderMap& headers); std::unique_ptr lastRequestHeaders(); + void setResponseTrailers(std::unique_ptr&& response_trailers); void setResponseHeaders(std::unique_ptr&& response_headers); + Http::TestResponseTrailerMapImpl responseTrailers(); Http::TestResponseHeaderMapImpl responseHeaders(); const bool allow_incomplete_streams_{false}; private: Thread::MutexBasicLockable headers_lock_; std::unique_ptr last_request_headers_; + std::unique_ptr response_trailers_; std::unique_ptr response_headers_; std::vector http_connections_; std::vector shared_connections_;