diff --git a/CODEOWNERS b/CODEOWNERS index 573ac35ed2120..f768d30167679 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -274,6 +274,8 @@ extensions/upstreams/tcp @ggreenway @mattklein123 /*/extensions/compression/zstd @rainingmaster @mattklein123 # cel /*/extensions/access_loggers/filters/cel @kyessenov @douglas-reid @adisuissa +# process rate limit +/*/extensions/access_loggers/filters/process_ratelimit @taoxuy @kyessenov # health check /*/extensions/filters/http/health_check @mattklein123 @adisuissa # lua diff --git a/api/BUILD b/api/BUILD index f21103176d9da..15b16fe5ed13d 100644 --- a/api/BUILD +++ b/api/BUILD @@ -132,6 +132,7 @@ proto_library( "//envoy/data/tap/v3:pkg", "//envoy/extensions/access_loggers/file/v3:pkg", "//envoy/extensions/access_loggers/filters/cel/v3:pkg", + "//envoy/extensions/access_loggers/filters/process_ratelimit/v3:pkg", "//envoy/extensions/access_loggers/fluentd/v3:pkg", "//envoy/extensions/access_loggers/grpc/v3:pkg", "//envoy/extensions/access_loggers/open_telemetry/v3:pkg", diff --git a/api/envoy/extensions/access_loggers/filters/process_ratelimit/v3/BUILD b/api/envoy/extensions/access_loggers/filters/process_ratelimit/v3/BUILD new file mode 100644 index 0000000000000..09a37ad16b837 --- /dev/null +++ b/api/envoy/extensions/access_loggers/filters/process_ratelimit/v3/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_xds//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/access_loggers/filters/process_ratelimit/v3/process_ratelimit.proto b/api/envoy/extensions/access_loggers/filters/process_ratelimit/v3/process_ratelimit.proto new file mode 100644 index 0000000000000..6b60a691d2fcc --- /dev/null +++ b/api/envoy/extensions/access_loggers/filters/process_ratelimit/v3/process_ratelimit.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package envoy.extensions.access_loggers.filters.process_ratelimit.v3; + +import "envoy/config/core/v3/config_source.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.access_loggers.filters.process_ratelimit.v3"; +option java_outer_classname = "ProcessRatelimitProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/filters/process_ratelimit/v3;process_ratelimitv3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: ProcessRateLimiter] +// [#extension: envoy.access_loggers.extension_filters.process_ratelimit] + +// Filters for rate limiting the access log emission using global token buckets per process and shared across all listeners. +message ProcessRateLimitFilter { + // The dynamic config for the token bucket. + DynamicTokenBucket dynamic_config = 1; +} + +message DynamicTokenBucket { + // the key used to find the token bucket in the singleton map. + string resource_name = 1 [(validate.rules).string = {min_len: 1}]; + + // The configuration source for the :ref:`token_bucket `. + // It should stay the same through the process lifetime. + config.core.v3.ConfigSource config_source = 2 [(validate.rules).message = {required: true}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index e2b1fe19d751c..dd9648f566a64 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -71,6 +71,7 @@ proto_library( "//envoy/data/tap/v3:pkg", "//envoy/extensions/access_loggers/file/v3:pkg", "//envoy/extensions/access_loggers/filters/cel/v3:pkg", + "//envoy/extensions/access_loggers/filters/process_ratelimit/v3:pkg", "//envoy/extensions/access_loggers/fluentd/v3:pkg", "//envoy/extensions/access_loggers/grpc/v3:pkg", "//envoy/extensions/access_loggers/open_telemetry/v3:pkg", diff --git a/changelogs/current.yaml b/changelogs/current.yaml index a55b6caec7b7c..d0ff71263732a 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -90,6 +90,10 @@ removed_config_or_runtime: Removed runtime guard ``envoy.reloadable_features.report_load_with_rq_issued`` and legacy code paths. new_features: +- area: access_log + change: | + Support process-level rate limiting on access log emission by + :ref:`ProcessRateLimitFilter `. - area: udp_sink change: | Enhance UDP sink to handle the tapped message which the size is bigger than 64K. diff --git a/source/extensions/access_loggers/filters/process_ratelimit/BUILD b/source/extensions/access_loggers/filters/process_ratelimit/BUILD new file mode 100644 index 0000000000000..bad82ab0996dd --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/BUILD @@ -0,0 +1,54 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_cc_library( + name = "provider_singleton_lib", + srcs = ["provider_singleton.cc"], + hdrs = ["provider_singleton.h"], + deps = [ + "//envoy/event:dispatcher_interface", + "//envoy/event:timer_interface", + "//envoy/ratelimit:ratelimit_interface", + "//source/common/common:thread_synchronizer_lib", + "//source/common/common:token_bucket_impl_lib", + "//source/common/config:subscription_base_interface", + "//source/common/grpc:common_lib", + "//source/common/init:target_lib", + "//source/extensions/filters/common/local_ratelimit:local_ratelimit_lib", + "@envoy_api//envoy/type/v3:pkg_cc_proto", + ], +) + +envoy_cc_extension( + name = "filter_lib", + srcs = ["filter.cc"], + hdrs = ["filter.h"], + deps = [ + ":provider_singleton_lib", + "//envoy/access_log:access_log_interface", + "//source/common/init:target_lib", + "@envoy_api//envoy/extensions/access_loggers/filters/process_ratelimit/v3:pkg_cc_proto", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":filter_lib", + "//envoy/access_log:access_log_interface", + "//envoy/registry", + "//source/common/access_log:access_log_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//envoy/extensions/access_loggers/filters/process_ratelimit/v3:pkg_cc_proto", + ], +) diff --git a/source/extensions/access_loggers/filters/process_ratelimit/config.cc b/source/extensions/access_loggers/filters/process_ratelimit/config.cc new file mode 100644 index 0000000000000..c301c0703dd35 --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/config.cc @@ -0,0 +1,39 @@ +#include "source/extensions/access_loggers/filters/process_ratelimit/config.h" + +#include "envoy/extensions/access_loggers/filters/process_ratelimit/v3/process_ratelimit.pb.h" + +#include "source/common/protobuf/utility.h" +#include "source/extensions/access_loggers/filters/process_ratelimit/filter.h" +#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace Filters { +namespace ProcessRateLimit { + +AccessLog::FilterPtr ProcessRateLimitFilterFactory::createFilter( + const envoy::config::accesslog::v3::ExtensionFilter& config, + Server::Configuration::GenericFactoryContext& context) { + auto factory_config = + Config::Utility::translateToFactoryConfig(config, context.messageValidationVisitor(), *this); + const auto& process_ratelimit_config = + dynamic_cast(*factory_config); + auto filter = std::make_unique(context.serverFactoryContext(), + process_ratelimit_config); + return filter; +} + +ProtobufTypes::MessagePtr ProcessRateLimitFilterFactory::createEmptyConfigProto() { + return std::make_unique< + envoy::extensions::access_loggers::filters::process_ratelimit::v3::ProcessRateLimitFilter>(); +} + +REGISTER_FACTORY(ProcessRateLimitFilterFactory, AccessLog::ExtensionFilterFactory); + +} // namespace ProcessRateLimit +} // namespace Filters +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/filters/process_ratelimit/config.h b/source/extensions/access_loggers/filters/process_ratelimit/config.h new file mode 100644 index 0000000000000..5664975604ef4 --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/config.h @@ -0,0 +1,28 @@ +#pragma once + +#include "envoy/access_log/access_log.h" +#include "envoy/registry/registry.h" + +#include "source/common/access_log/access_log_impl.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace Filters { +namespace ProcessRateLimit { + +class ProcessRateLimitFilterFactory : public AccessLog::ExtensionFilterFactory { +public: + AccessLog::FilterPtr createFilter(const envoy::config::accesslog::v3::ExtensionFilter& config, + Server::Configuration::GenericFactoryContext& context) override; + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + std::string name() const override { + return "envoy.access_loggers.extension_filters.process_ratelimit"; + } +}; + +} // namespace ProcessRateLimit +} // namespace Filters +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/filters/process_ratelimit/filter.cc b/source/extensions/access_loggers/filters/process_ratelimit/filter.cc new file mode 100644 index 0000000000000..8a4f421a2c8e4 --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/filter.cc @@ -0,0 +1,72 @@ +#include "source/extensions/access_loggers/filters/process_ratelimit/filter.h" + +#include "envoy/access_log/access_log.h" + +#include "source/common/init/target_impl.h" +#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace Filters { +namespace ProcessRateLimit { + +ProcessRateLimitFilter::ProcessRateLimitFilter( + Server::Configuration::ServerFactoryContext& context, + const envoy::extensions::access_loggers::filters::process_ratelimit::v3::ProcessRateLimitFilter& + config) + : setter_key_(reinterpret_cast(this)), + cancel_cb_(std::make_shared>(false)), context_(context), + stats_({ALL_PROCESS_RATELIMIT_FILTER_STATS( + POOL_COUNTER_PREFIX(context.scope(), "access_log.process_ratelimit."))}) { + auto setter = + [this, cancel_cb = cancel_cb_]( + Envoy::Extensions::Filters::Common::LocalRateLimit::LocalRateLimiterSharedPtr limiter) + -> void { + if (!cancel_cb->load()) { + ENVOY_BUG(limiter != nullptr, "limiter shouldn't be null if the `limiter` is set from " + "callback."); + rate_limiter_->setLimiter(limiter); + } + }; + + if (!config.has_dynamic_config()) { + ExceptionUtil::throwEnvoyException("`dynamic_config` is required."); + } + rate_limiter_ = Envoy::Extensions::Filters::Common::LocalRateLimit::RateLimiterProviderSingleton:: + getRateLimiter(context, config.dynamic_config().resource_name(), + config.dynamic_config().config_source(), setter_key_, std::move(setter)); +} + +ProcessRateLimitFilter::~ProcessRateLimitFilter() { + // The destructor can be called in any thread. + // The `cancel_cb_` is set to true to prevent the `limiter` from being set in + // the `setter` from the main thread. + cancel_cb_->store(true); + context_.mainThreadDispatcher().post( + [limiter = std::move(rate_limiter_), setter_key = setter_key_] { + // remove the setter for this filter. + limiter->getSubscription()->removeSetter(setter_key); + }); +} + +bool ProcessRateLimitFilter::evaluate(const Formatter::Context&, + const StreamInfo::StreamInfo&) const { + ENVOY_BUG(rate_limiter_->getLimiter() != nullptr, + "rate_limiter_.limiter_ should be already set in init callback."); + Extensions::Filters::Common::LocalRateLimit::LocalRateLimiterSharedPtr limiter = + rate_limiter_->getLimiter(); + auto result = limiter->requestAllowed({}); + if (!result.allowed) { + stats_.denied_.inc(); + } else { + stats_.allowed_.inc(); + } + return result.allowed; +} + +} // namespace ProcessRateLimit +} // namespace Filters +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/filters/process_ratelimit/filter.h b/source/extensions/access_loggers/filters/process_ratelimit/filter.h new file mode 100644 index 0000000000000..4ef16daa7b34a --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/filter.h @@ -0,0 +1,49 @@ +#pragma once + +#include "envoy/access_log/access_log.h" +#include "envoy/extensions/access_loggers/filters/process_ratelimit/v3/process_ratelimit.pb.h" + +#include "source/common/init/target_impl.h" +#include "source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace Filters { +namespace ProcessRateLimit { + +#define ALL_PROCESS_RATELIMIT_FILTER_STATS(COUNTER) \ + COUNTER(allowed) \ + COUNTER(denied) + +/** + * Struct definition for all process ratelimit filter stats. @see stats_macros.h + */ +struct ProcessRateLimitFilterStats { + ALL_PROCESS_RATELIMIT_FILTER_STATS(GENERATE_COUNTER_STRUCT) +}; +class ProcessRateLimitFilter : public AccessLog::Filter { +public: + ProcessRateLimitFilter(Server::Configuration::ServerFactoryContext& context, + const envoy::extensions::access_loggers::filters::process_ratelimit::v3:: + ProcessRateLimitFilter& config); + + bool evaluate(const Formatter::Context& log_context, + const StreamInfo::StreamInfo& stream_info) const override; + + ~ProcessRateLimitFilter() override; + +private: + const intptr_t setter_key_; + std::shared_ptr> cancel_cb_; + Server::Configuration::ServerFactoryContext& context_; + ProcessRateLimitFilterStats stats_; + mutable Envoy::Extensions::Filters::Common::LocalRateLimit::RateLimiterProviderSingleton:: + RateLimiterWrapperPtr rate_limiter_; +}; + +} // namespace ProcessRateLimit +} // namespace Filters +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.cc b/source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.cc new file mode 100644 index 0000000000000..91320c4c181dc --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.cc @@ -0,0 +1,203 @@ +#include "source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.h" + +#include "source/common/grpc/common.h" + +namespace Envoy { +namespace Extensions { +namespace Filters { +namespace Common { +namespace LocalRateLimit { + +SINGLETON_MANAGER_REGISTRATION(local_ratelimit_provider); + +std::shared_ptr +createRateLimiterImpl(const envoy::type::v3::TokenBucket& token_bucket, + Event::Dispatcher& dispatcher) { + const auto fill_interval = + std::chrono::milliseconds(PROTOBUF_GET_MS_REQUIRED(token_bucket, fill_interval)); + const auto max_tokens = token_bucket.max_tokens(); + const auto tokens_per_fill = PROTOBUF_GET_WRAPPED_OR_DEFAULT(token_bucket, tokens_per_fill, 1); + return std::make_shared( + fill_interval, max_tokens, tokens_per_fill, dispatcher, + Protobuf::RepeatedPtrField< + envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>()); +} + +void RateLimiterProviderSingleton::RateLimiterWrapper::setLimiter( + LocalRateLimiterSharedPtr limiter) { + ENVOY_BUG(init_target_ != nullptr, + "init_target_ should not be null if the limiter is set from callback"); + limiter_slot_.runOnAllThreads( + [limiter, cancelled = cancelled_](OptRef thread_local_limiter) { + if (!cancelled->load()) { + thread_local_limiter->limiter = limiter; + } + }, + [init_target = init_target_.get(), cancelled = cancelled_]() { + if (!cancelled->load()) { + init_target->ready(); + } + }); +} + +RateLimiterProviderSingleton::RateLimiterWrapperPtr RateLimiterProviderSingleton::getRateLimiter( + Server::Configuration::ServerFactoryContext& factory_context, absl::string_view key, + const envoy::config::core::v3::ConfigSource& config_source, intptr_t setter_key, + SetRateLimiterCb setter) { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + auto provider = factory_context.singletonManager().getTyped( + SINGLETON_MANAGER_REGISTERED_NAME(local_ratelimit_provider), + [&factory_context, &config_source] { + return std::make_shared(factory_context, config_source); + }); + + // Find the subscription for the given key. + auto it = provider->subscriptions_.find(key); + TokenBucketSubscriptionSharedPtr subscription = nullptr; + if (it == provider->subscriptions_.end()) { + // If the subscription doesn't exist, create a new one. + subscription = std::make_shared(*provider, key); + it = provider->subscriptions_.emplace(key, subscription).first; + } else { + auto exist_subscription = it->second.lock(); + ENVOY_BUG(exist_subscription != nullptr, + fmt::format("subscription for {} should not be null since it should be " + "cleaned up when the last wrapper is destroyed", + key)); + subscription = exist_subscription; + } + subscription->addSetter(setter_key, std::move(setter)); + + // If the limiter is already created, return it. + if (auto limiter = subscription->getLimiter()) { + return std::make_unique(factory_context.threadLocal(), provider, + subscription, limiter, nullptr); + } + + auto init_target = + std::make_unique(fmt::format("RateLimitConfigCallback-{}", key), []() {}); + + // Add the init target to the listener's init manager to wait for the + // resource. + factory_context.initManager().add(*init_target); + + // Otherwise, return a wrapper with a null limiter. The limiter will be + // set when the config is received. + return std::make_unique(factory_context.threadLocal(), provider, subscription, + nullptr, std::move(init_target)); +} + +std::shared_ptr +RateLimiterProviderSingleton::TokenBucketSubscription::getLimiter() { + auto limiter = limiter_.lock(); + if (limiter) { + return limiter; + } + if (config_.has_value()) { + limiter = + createRateLimiterImpl(config_.value(), parent_.factory_context_.mainThreadDispatcher()); + limiter_ = limiter; + return limiter; + } + return nullptr; +} + +RateLimiterProviderSingleton::TokenBucketSubscription::TokenBucketSubscription( + RateLimiterProviderSingleton& parent, absl::string_view resource_name) + : Config::SubscriptionBase( + parent.factory_context_.messageValidationVisitor(), ""), + parent_(parent), resource_name_(resource_name), token_bucket_config_hash_(0) { + subscription_ = THROW_OR_RETURN_VALUE( + parent.factory_context_.xdsManager().subscribeToSingletonResource( + resource_name, parent.config_source_, Grpc::Common::typeUrl(getResourceName()), + *parent.scope_, *this, resource_decoder_, {}), + Config::SubscriptionPtr); + subscription_->start({resource_name_}); +} + +RateLimiterProviderSingleton::TokenBucketSubscription::~TokenBucketSubscription() { + parent_.subscriptions_.erase(resource_name_); +} + +void RateLimiterProviderSingleton::TokenBucketSubscription::handleAddedResource( + const Config::DecodedResourceRef& resource) { + const auto& config = dynamic_cast(resource.get().resource()); + size_t new_hash = MessageUtil::hash(config); + // If the config is the same, no op. + if (new_hash == token_bucket_config_hash_) { + return; + } + + // Update the config and hash and reset the limiter. + config_ = config; + token_bucket_config_hash_ = new_hash; + auto new_limiter = createRateLimiterImpl(config, parent_.factory_context_.mainThreadDispatcher()); + limiter_ = new_limiter; + for (auto& [key, setter] : setters_) { + // The method is called on main thread while the limiter will be accessed in the worker thread + // so setter will call `runOnAllThreads` to set underneath. + setter(new_limiter); + } +} + +void RateLimiterProviderSingleton::TokenBucketSubscription::handleRemovedResource( + absl::string_view) { + // We simply reset the config and limiter here. The existing rate limiter will + // continue to work before the new config is received. + config_.reset(); + token_bucket_config_hash_ = 0; + limiter_.reset(); + + // Reset the init target as we are now waiting for a new resource. + for (auto& [key, setter] : setters_) { + setter(parent_.fallback_always_deny_limiter_); + } +} + +absl::Status RateLimiterProviderSingleton::TokenBucketSubscription::onConfigUpdate( + const std::vector& resources, const std::string&) { + ENVOY_BUG(resources.size() == 1, + fmt::format("for singleton resource subscription, only one resource should be " + "added or removed at a time but got {}", + resources.size())); + ENVOY_BUG(resources[0].get().name() == resource_name_, + fmt::format("for singleton resource subscription, the added resource name " + "should be the same as the subscription resource name but got " + "{} != {}", + resources[0].get().name(), resource_name_)); + + handleAddedResource(resources[0]); + return absl::OkStatus(); +} + +absl::Status RateLimiterProviderSingleton::TokenBucketSubscription::onConfigUpdate( + const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, const std::string&) { + ENVOY_BUG(added_resources.size() + removed_resources.size() == 1, + fmt::format("for singleton resource subscription, only one resource should be " + "added or removed at a time but got {}", + added_resources.size() + removed_resources.size())); + if (added_resources.size() == 1) { + ENVOY_BUG(added_resources[0].get().name() == resource_name_, + fmt::format("for singleton resource subscription, the added resource name " + "should be the same as the subscription resource name but got {} " + "!= {}", + added_resources[0].get().name(), resource_name_)); + handleAddedResource(added_resources[0]); + } else { + ENVOY_BUG(removed_resources[0] == resource_name_, + fmt::format("for singleton resource subscription, the removed resource name " + "should be the same as the subscription resource name but got {} " + "!= {}", + removed_resources[0], resource_name_)); + handleRemovedResource(removed_resources[0]); + } + + return absl::OkStatus(); +} + +} // namespace LocalRateLimit +} // namespace Common +} // namespace Filters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.h b/source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.h new file mode 100644 index 0000000000000..d3d286fa1e6e5 --- /dev/null +++ b/source/extensions/access_loggers/filters/process_ratelimit/provider_singleton.h @@ -0,0 +1,176 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/type/v3/token_bucket.pb.h" +#include "envoy/type/v3/token_bucket.pb.validate.h" + +#include "source/common/config/subscription_base.h" +#include "source/common/init/target_impl.h" +#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" + +namespace Envoy { +namespace Extensions { +namespace Filters { +namespace Common { +namespace LocalRateLimit { + +// RateLimiterProviderSingleton and its child classes are used to achieve the +// rate limiter singletons shared within the process. +// +// High-level architecture: +// - RateLimiterProviderSingleton: A process-wide singleton responsible for +// managing and vending rate limiters. It holds subscriptions to token bucket +// configurations. +// - TokenBucketSubscription: Manages the subscription to a specific token +// bucket resource via xDS. It receives configuration updates and creates or +// updates the underlying LocalRateLimiterImpl. +// - LocalRateLimiterImpl: The actual rate limiter implementation based on the +// token bucket algorithm. Instances are shared among filters requesting the +// same resource. +// - RateLimiterWrapper: A wrapper class holding shared pointers to the +// provider, subscription, and limiter. This ensures that the necessary +// components remain alive as long as they are in use by any filter. +// +// Workflow: +// 1. A filter requests a rate limiter for a specific key (resource name). +// 2. RateLimiterProviderSingleton::getRateLimiter is called. +// 3. It looks up or creates a TokenBucketSubscription for the given resource +// name. +// 4. The TokenBucketSubscription establishes an xDS subscription to fetch the +// envoy::type::v3::TokenBucket. +// 5. Upon receiving the configuration, the TokenBucketSubscription creates or +// updates a shared LocalRateLimiterImpl instance. +// 6. The getRateLimiter method returns a RateLimiterWrapper, which provides +// access to the shared LocalRateLimiterImpl. The wrapper's shared pointers +// keep the subscription and provider alive. +// 7. When the configuration is updated via xDS, the TokenBucketSubscription +// updates the shared LocalRateLimiterImpl instance, affecting all filters +// using it. +// 8. When the configuration is removed via xDS, the TokenBucketSubscription +// resets the shared LocalRateLimiterImpl instance to an AlwaysDeny limiter. +// This prevents new filters from using the old limiter. +class RateLimiterProviderSingleton; +using RateLimiterProviderSingletonSharedPtr = std::shared_ptr; +class RateLimiterProviderSingleton : public Singleton::Instance { +public: + class TokenBucketSubscription; + using TokenBucketSubscriptionSharedPtr = std::shared_ptr; + struct ThreadLocalLimiter : public Envoy::ThreadLocal::ThreadLocalObject { + ThreadLocalLimiter(LocalRateLimiterSharedPtr limiter) : limiter(limiter) {} + LocalRateLimiterSharedPtr limiter = nullptr; + }; + + class RateLimiterWrapper { + public: + RateLimiterWrapper(ThreadLocal::Instance& tls, RateLimiterProviderSingletonSharedPtr provider, + TokenBucketSubscriptionSharedPtr subscription, + std::shared_ptr limiter, + std::unique_ptr init_target) + : cancelled_(std::make_shared>(false)), provider_(provider), + subscription_(subscription), limiter_slot_(tls), init_target_(std::move(init_target)) { + limiter_slot_.set([l = limiter](Envoy::Event::Dispatcher&) { + return std::make_shared(l); + }); + } + + LocalRateLimiterSharedPtr getLimiter() const { return limiter_slot_.get()->limiter; } + + void setLimiter(LocalRateLimiterSharedPtr limiter); + + ~RateLimiterWrapper() { cancelled_->store(true); } + + TokenBucketSubscriptionSharedPtr getSubscription() const { return subscription_; } + + private: + // The bool to denote if the object is deleted so we need to cancel the async setter and the + // callback. + std::shared_ptr> cancelled_; + // The `provider_` holds the ownership of this singleton by shared + // pointer, as the rate limiter map singleton isn't pinned and is + // shared among all the access log rate limit filters. + RateLimiterProviderSingletonSharedPtr provider_; + + // The `subscription_` holds the ownership of the subscription to the token + // bucket resource by shared pointer. + TokenBucketSubscriptionSharedPtr subscription_; + + // The `limiter_slot_` holds the ownership of the rate limiter(with the + // underlying token bucket) by shared pointer. Access loggers using the same + // `resource_name` of token bucket will share the same rate limiter. + // + // The `limiter_slot_` is thread-safe and can be accessed by multiple + // threads. It protects the `limiter` from being read and updated + // concurrently when listeners are active and there are new TokenBucket + // resources coming. + Envoy::ThreadLocal::TypedSlot limiter_slot_; + + // The `init_target_` is used to wait for the rate limiter to be set. It + // makes sure the access logger won't log until the rate limiter is ready. + std::unique_ptr init_target_; + }; + using RateLimiterWrapperPtr = std::unique_ptr; + + using SetRateLimiterCb = std::function; + static RateLimiterWrapperPtr + getRateLimiter(Server::Configuration::ServerFactoryContext& factory_context, + absl::string_view key, const envoy::config::core::v3::ConfigSource& config_source, + intptr_t setter_key, SetRateLimiterCb setter); + + RateLimiterProviderSingleton(Server::Configuration::ServerFactoryContext& factory_context, + const envoy::config::core::v3::ConfigSource& config_source) + : factory_context_(factory_context), config_source_(config_source), + scope_(factory_context.scope().createScope("local_ratelimit_discovery")), + fallback_always_deny_limiter_(std::make_shared()) {} + + class TokenBucketSubscription : Config::SubscriptionBase { + public: + explicit TokenBucketSubscription(RateLimiterProviderSingleton& parent, + absl::string_view resource_name); + + ~TokenBucketSubscription() override; + + void addSetter(intptr_t key, SetRateLimiterCb callback) { setters_[key] = std::move(callback); } + + void removeSetter(intptr_t key) { setters_.erase(key); } + + std::shared_ptr getLimiter(); + + // Config::SubscriptionCallbacks + absl::Status onConfigUpdate(const std::vector& resources, + const std::string&) override; + + absl::Status onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string&) override; + + void onConfigUpdateFailed(Config::ConfigUpdateFailureReason, const EnvoyException*) override {} + + private: + void handleAddedResource(const Config::DecodedResourceRef& resource); + void handleRemovedResource(absl::string_view resource_name); + + RateLimiterProviderSingleton& parent_; + std::string resource_name_; + Config::SubscriptionPtr subscription_; + absl::flat_hash_map setters_; + absl::optional config_; + std::weak_ptr limiter_; + size_t token_bucket_config_hash_; + }; + + Server::Configuration::ServerFactoryContext& factory_context_; + const envoy::config::core::v3::ConfigSource config_source_; + Stats::ScopeSharedPtr scope_; + LocalRateLimiterSharedPtr fallback_always_deny_limiter_; + absl::flat_hash_map> subscriptions_; +}; + +} // namespace LocalRateLimit +} // namespace Common +} // namespace Filters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 803e15ea7c0e3..7ba3a93685b46 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -6,6 +6,7 @@ EXTENSIONS = { "envoy.access_loggers.file": "//source/extensions/access_loggers/file:config", "envoy.access_loggers.extension_filters.cel": "//source/extensions/access_loggers/filters/cel:config", + "envoy.access_loggers.extension_filters.process_ratelimit": "//source/extensions/access_loggers/filters/process_ratelimit:config", "envoy.access_loggers.fluentd" : "//source/extensions/access_loggers/fluentd:config", "envoy.access_loggers.http_grpc": "//source/extensions/access_loggers/grpc:http_config", "envoy.access_loggers.tcp_grpc": "//source/extensions/access_loggers/grpc:tcp_config", diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index df8e5dc3fdb7b..ae99362442c34 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -12,6 +12,13 @@ envoy.access_loggers.extension_filters.cel: status: alpha type_urls: - envoy.extensions.access_loggers.filters.cel.v3.ExpressionFilter +envoy.access_loggers.extension_filters.process_ratelimit: + categories: + - envoy.access_loggers.extension_filters + security_posture: unknown + status: alpha + type_urls: + - envoy.extensions.access_loggers.filters.process_ratelimit.v3.ProcessRateLimitFilter envoy.access_loggers.fluentd: categories: - envoy.access_loggers diff --git a/source/extensions/filters/common/local_ratelimit/BUILD b/source/extensions/filters/common/local_ratelimit/BUILD index 5cea645b0485c..85d0305178441 100644 --- a/source/extensions/filters/common/local_ratelimit/BUILD +++ b/source/extensions/filters/common/local_ratelimit/BUILD @@ -8,11 +8,20 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() +envoy_cc_library( + name = "local_ratelimit_interface", + hdrs = ["local_ratelimit.h"], + deps = [ + "//envoy/ratelimit:ratelimit_interface", + ], +) + envoy_cc_library( name = "local_ratelimit_lib", srcs = ["local_ratelimit_impl.cc"], hdrs = ["local_ratelimit_impl.h"], deps = [ + ":local_ratelimit_interface", "//envoy/event:dispatcher_interface", "//envoy/event:timer_interface", "//envoy/ratelimit:ratelimit_interface", diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit.h new file mode 100644 index 0000000000000..e828b7f3ca9d0 --- /dev/null +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +#include "envoy/ratelimit/ratelimit.h" + +namespace Envoy { +namespace Extensions { +namespace Filters { +namespace Common { +namespace LocalRateLimit { + +class TokenBucketContext { +public: + virtual ~TokenBucketContext() = default; + + virtual uint64_t maxTokens() const PURE; + virtual uint64_t remainingTokens() const PURE; + virtual uint64_t resetSeconds() const PURE; +}; + +// Interface for a local rate limiter. +class LocalRateLimiter { +public: + struct Result { + bool allowed{}; + std::shared_ptr token_bucket_context{}; + }; + + virtual ~LocalRateLimiter() = default; + + // Returns true if the request should be rate limited. + virtual Result requestAllowed(absl::Span request_descriptors) = 0; +}; +using LocalRateLimiterSharedPtr = std::shared_ptr; + +} // namespace LocalRateLimit +} // namespace Common +} // namespace Filters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index 05adf8a558fc6..40bc8d0c0946a 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -13,6 +13,7 @@ #include "source/common/common/thread_synchronizer.h" #include "source/common/common/token_bucket_impl.h" #include "source/common/protobuf/protobuf.h" +#include "source/extensions/filters/common/local_ratelimit/local_ratelimit.h" namespace Envoy { namespace Extensions { @@ -99,15 +100,6 @@ class ShareProviderManager : public Singleton::Instance { }; using ShareProviderManagerSharedPtr = std::shared_ptr; -class TokenBucketContext { -public: - virtual ~TokenBucketContext() = default; - - virtual uint64_t maxTokens() const PURE; - virtual uint64_t remainingTokens() const PURE; - virtual uint64_t resetSeconds() const PURE; -}; - class RateLimitTokenBucket : public TokenBucketContext, public Logger::Loggable { public: @@ -133,13 +125,9 @@ class RateLimitTokenBucket : public TokenBucketContext, }; using RateLimitTokenBucketSharedPtr = std::shared_ptr; -class LocalRateLimiterImpl : public Logger::Loggable { +class LocalRateLimiterImpl : public Logger::Loggable, + public LocalRateLimiter { public: - struct Result { - bool allowed{}; - std::shared_ptr token_bucket_context{}; - }; - LocalRateLimiterImpl( const std::chrono::milliseconds fill_interval, const uint64_t max_tokens, const uint64_t tokens_per_fill, Event::Dispatcher& dispatcher, @@ -149,7 +137,8 @@ class LocalRateLimiterImpl : public Logger::Loggable request_descriptors); + LocalRateLimiter::Result + requestAllowed(absl::Span request_descriptors); private: RateLimitTokenBucketSharedPtr default_token_bucket_; @@ -164,6 +153,13 @@ class LocalRateLimiterImpl : public Logger::Loggable) override { + return {false, nullptr}; + } +}; + } // namespace LocalRateLimit } // namespace Common } // namespace Filters diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc index 8b7209eee9649..454b6fefb893b 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc @@ -115,7 +115,7 @@ FilterConfig::FilterConfig( always_consume_default_token_bucket_, std::move(share_provider), max_dynamic_descriptors_); } -Filters::Common::LocalRateLimit::LocalRateLimiterImpl::Result +Filters::Common::LocalRateLimit::LocalRateLimiter::Result FilterConfig::requestAllowed(absl::Span request_descriptors) const { return rate_limiter_->requestAllowed(request_descriptors); } diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h index 303b2798d61ba..5019fbffbc9f0 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h @@ -86,7 +86,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig, } const LocalInfo::LocalInfo& localInfo() const { return local_info_; } Runtime::Loader& runtime() { return runtime_; } - Filters::Common::LocalRateLimit::LocalRateLimiterImpl::Result + Filters::Common::LocalRateLimit::LocalRateLimiter::Result requestAllowed(absl::Span request_descriptors) const; bool enabled() const; bool enforced() const; @@ -196,7 +196,7 @@ class Filter : public Http::PassThroughFilter, Logger::Loggable request_descriptors); FilterConfigSharedPtr config_; diff --git a/test/extensions/access_loggers/filters/process_ratelimit/BUILD b/test/extensions/access_loggers/filters/process_ratelimit/BUILD new file mode 100644 index 0000000000000..09a81b385a53c --- /dev/null +++ b/test/extensions/access_loggers/filters/process_ratelimit/BUILD @@ -0,0 +1,60 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_extension_cc_test( + name = "access_log_impl_test", + srcs = ["access_log_impl_test.cc"], + extension_names = ["envoy.access_loggers.extension_filters.process_ratelimit"], + rbe_pool = "6gig", + deps = [ + "//source/common/access_log:access_log_lib", + "//source/common/formatter:formatter_extension_lib", + "//source/common/stream_info:utility_lib", + "//source/extensions/access_loggers/file:config", + "//source/extensions/access_loggers/filters/process_ratelimit:config", + "//source/extensions/config_subscription/filesystem:filesystem_subscription_lib", + "//test/common/stream_info:test_util", + "//test/common/upstream:utility_lib", + "//test/mocks/access_log:access_log_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/filesystem:filesystem_mocks", + "//test/mocks/router:router_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/mocks/server:factory_context_mocks", + "//test/mocks/upstream:cluster_info_mocks", + "//test/test_common:environment_lib", + "//test/test_common:registry_lib", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto", + "@envoy_api//envoy/type/v3:pkg_cc_proto", + ], +) + +envoy_extension_cc_test( + name = "integration_test", + size = "large", + srcs = ["integration_test.cc"], + extension_names = ["envoy.access_loggers.extension_filters.process_ratelimit"], + rbe_pool = "6gig", + deps = [ + "//source/extensions/access_loggers/filters/process_ratelimit:config", + "//source/extensions/formatter/cel:config", + "//test/integration:http_integration_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/accesslog/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/access_loggers/file/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/access_loggers/filters/process_ratelimit/v3:pkg_cc_proto", + "@envoy_api//envoy/type/v3:pkg_cc_proto", + ], +) diff --git a/test/extensions/access_loggers/filters/process_ratelimit/access_log_impl_test.cc b/test/extensions/access_loggers/filters/process_ratelimit/access_log_impl_test.cc new file mode 100644 index 0000000000000..dea8a766bad9e --- /dev/null +++ b/test/extensions/access_loggers/filters/process_ratelimit/access_log_impl_test.cc @@ -0,0 +1,399 @@ +#include +#include +#include +#include + +#include "envoy/common/optref.h" +#include "envoy/config/accesslog/v3/accesslog.pb.h" +#include "envoy/config/accesslog/v3/accesslog.pb.validate.h" +#include "envoy/config/subscription.h" +#include "envoy/event/dispatcher.h" +#include "envoy/stats/scope.h" +#include "envoy/type/v3/token_bucket.pb.h" +#include "envoy/type/v3/token_bucket.pb.validate.h" + +#include "source/common/access_log/access_log_impl.h" +#include "source/common/protobuf/message_validator_impl.h" + +#include "test/common/stream_info/test_util.h" +#include "test/mocks/access_log/mocks.h" +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/init/mocks.h" +#include "test/mocks/server/factory_context.h" +#include "test/test_common/printers.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "absl/container/flat_hash_map.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" +#include "gmock/gmock.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace Filters { +namespace ProcessRateLimit { +namespace { + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; +using testing::SaveArg; + +envoy::config::accesslog::v3::AccessLog parseAccessLogFromV3Yaml(const std::string& yaml) { + envoy::config::accesslog::v3::AccessLog access_log; + TestUtility::loadFromYamlAndValidate(yaml, access_log); + return access_log; +} + +class AccessLogImplTestWithRateLimitFilter : public Event::TestUsingSimulatedTime, + public testing::Test { +public: + AccessLogImplTestWithRateLimitFilter() + : stream_info_(time_source_), file_(new AccessLog::MockAccessLogFile()) { + ON_CALL(context_.server_factory_context_, runtime()).WillByDefault(ReturnRef(runtime_)); + ON_CALL(context_.server_factory_context_, accessLogManager()) + .WillByDefault(ReturnRef(log_manager_)); + ON_CALL(log_manager_, createAccessLog(_)).WillByDefault(Return(file_)); + ON_CALL(context_.server_factory_context_, scope()).WillByDefault(ReturnRef(context_.scope())); + ON_CALL(*file_, write(_)).WillByDefault(SaveArg<0>(&output_)); + stream_info_.addBytesReceived(1); + stream_info_.addBytesSent(2); + stream_info_.protocol(Http::Protocol::Http11); + // Clear default stream id provider. + stream_info_.stream_id_provider_ = nullptr; + time_system_ = new Envoy::Event::SimulatedTimeSystem(); + context_.server_factory_context_.dispatcher_.time_system_.reset(time_system_); + + ON_CALL(context_.server_factory_context_.xds_manager_, + subscribeToSingletonResource(_, _, _, _, _, _, _)) + .WillByDefault(Invoke( + [this](absl::string_view resource_name, + OptRef, absl::string_view, + Stats::Scope&, Config::SubscriptionCallbacks& callbacks, + Config::OpaqueResourceDecoderSharedPtr, + const Config::SubscriptionOptions&) -> absl::StatusOr { + auto ret = std::make_unique>(); + subscriptions_[resource_name] = ret.get(); + callbackss_[resource_name] = &callbacks; + return ret; + })); + + ON_CALL(context_.server_factory_context_.init_manager_, add(_)) + .WillByDefault(Invoke([this](const Init::Target& target) { + init_target_handles_.push_back(target.createHandle("test")); + })); + + ON_CALL(context_.server_factory_context_.init_manager_, initialize(_)) + .WillByDefault(Invoke([this](const Init::Watcher& watcher) { + while (!init_target_handles_.empty()) { + init_target_handles_.back()->initialize(watcher); + init_target_handles_.pop_back(); + } + })); + } + +protected: + void expectWritesAndLog(AccessLog::InstanceSharedPtr log, int expect_write_times, + int log_call_times) { + EXPECT_CALL(*file_, write(_)).Times(expect_write_times); + for (int i = 0; i < log_call_times; ++i) { + log->log({&request_headers_, &response_headers_, &response_trailers_}, stream_info_); + } + } + + const std::string default_access_log_ = R"EOF( +name: accesslog +filter: + extension_filter: + name: local_ratelimit_extension_filter + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.filters.process_ratelimit.v3.ProcessRateLimitFilter + dynamic_config: + resource_name: "token_bucket_name" + config_source: + ads: {} +typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: /dev/null + )EOF"; + + const envoy::type::v3::TokenBucket token_bucket_resource_ = + TestUtility::parseYaml(R"EOF( +max_tokens: 1 +tokens_per_fill: 1 +fill_interval: + seconds: 1 +)EOF"); + + NiceMock time_source_; + Http::TestRequestHeaderMapImpl request_headers_{{":method", "GET"}, {":path", "/"}}; + Http::TestResponseHeaderMapImpl response_headers_; + Http::TestResponseTrailerMapImpl response_trailers_; + TestStreamInfo stream_info_; + std::shared_ptr file_; + StringViewSaver output_; + + NiceMock runtime_; + NiceMock log_manager_; + NiceMock context_; + Envoy::Event::SimulatedTimeSystem* time_system_; + + absl::flat_hash_map subscriptions_; + absl::flat_hash_map callbackss_; + std::vector init_target_handles_; + NiceMock init_watcher_; +}; + +TEST_F(AccessLogImplTestWithRateLimitFilter, InvalidConfigWithEmptyDynamicConfig) { + const std::string invalid_access_log = R"EOF( +name: accesslog +filter: + extension_filter: + name: local_ratelimit_extension_filter + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.filters.process_ratelimit.v3.ProcessRateLimitFilter + dynamic_config: +typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: /dev/null + )EOF"; + EXPECT_THROW(AccessLog::AccessLogFactory::fromProto(parseAccessLogFromV3Yaml(invalid_access_log), + context_), + EnvoyException); +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, FilterDestructedBeforeCallback) { + AccessLog::InstanceSharedPtr log1 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + // log2 is to hold the provider singleton alive. + AccessLog::InstanceSharedPtr log2 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + // Destruct the log object, which destructs the filter. + log1.reset(); + + // Now, simulate the config update arriving. The lambda captured in + // getRateLimiter should handle the filter being gone. + EXPECT_CALL(init_watcher_, ready()); + const auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + + // No crash should occur. The main thing we are testing is that the callback + // doesn't try to access any members of the destructed filter. +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, HappyPath) { + AccessLog::InstanceSharedPtr log = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + EXPECT_CALL(init_watcher_, ready()); + const auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + + // First log is written, second is rate limited. + expectWritesAndLog(log, /*expect_write_times=*/1, /*log_call_times=*/2); + EXPECT_EQ(context_.scope().counterFromString("access_log.process_ratelimit.allowed").value(), 1); + + EXPECT_EQ(context_.scope().counterFromString("access_log.process_ratelimit.denied").value(), 1); + + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(1))); + // Third log is written, fourth is rate limited. + expectWritesAndLog(log, /*expect_write_times=*/1, /*log_call_times=*/2); + EXPECT_EQ(context_.scope().counterFromString("access_log.process_ratelimit.allowed").value(), 2); + + EXPECT_EQ(context_.scope().counterFromString("access_log.process_ratelimit.denied").value(), 2); +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, SharedTokenBucketInitTogether) { + AccessLog::InstanceSharedPtr log1 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + AccessLog::InstanceSharedPtr log2 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + const auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + + expectWritesAndLog(log1, /*expect_write_times=*/1, /*log_call_times=*/1); + expectWritesAndLog(log2, /*expect_write_times=*/0, /*log_call_times=*/1); + + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(1))); + expectWritesAndLog(log2, /*expect_write_times=*/1, /*log_call_times=*/1); + expectWritesAndLog(log1, /*expect_write_times=*/0, /*log_call_times=*/1); +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, SharedTokenBucketInitSeparately) { + AccessLog::InstanceSharedPtr log1 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + EXPECT_CALL(init_watcher_, ready()); + const auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + expectWritesAndLog(log1, /*expect_write_times=*/1, /*log_call_times=*/1); + + // Init the second log with the same token bucket. + AccessLog::InstanceSharedPtr log2 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + expectWritesAndLog(log2, /*expect_write_times=*/0, /*log_call_times=*/1); + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(1))); + expectWritesAndLog(log2, /*expect_write_times=*/1, /*log_call_times=*/1); + expectWritesAndLog(log1, /*expect_write_times=*/0, /*log_call_times=*/1); +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, TokenBucketUpdatedUnderSameResourceName) { + AccessLog::InstanceSharedPtr log1 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + EXPECT_CALL(init_watcher_, ready()); + const auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + expectWritesAndLog(log1, /*expect_write_times=*/1, /*log_call_times=*/2); + + const auto decoded_resources_2 = TestUtility::decodeResources( + {{"token_bucket_name", TestUtility::parseYaml(R"EOF( +max_tokens: 2 +tokens_per_fill: 2 +fill_interval: + seconds: 1 +)EOF")}}); + EXPECT_TRUE( + callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources_2.refvec_, "").ok()); + // Init the second log with the same token bucket. + AccessLog::InstanceSharedPtr log2 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + // The new token bucket allows 2 writes per second. We call log 3 times. + expectWritesAndLog(log2, /*expect_write_times=*/2, /*log_call_times=*/3); +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, RemoveAndAddResource) { + AccessLog::InstanceSharedPtr log1 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + // 1. Add the resource + EXPECT_CALL(init_watcher_, ready()); + auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + expectWritesAndLog(log1, /*expect_write_times=*/1, /*log_call_times=*/2); + + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(1))); + + // 2. Remove the token bucket. + Protobuf::RepeatedPtrField removed_resources; + removed_resources.Add("token_bucket_name"); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate({}, removed_resources, "").ok()); + // The rate limiter should always deny. + expectWritesAndLog(log1, /*expect_write_times=*/0, /*log_call_times=*/1); + + // 3. Add the resource back. + auto new_token_bucket = TestUtility::parseYaml(R"EOF( +max_tokens: 3 +tokens_per_fill: 3 +fill_interval: + seconds: 3 +)EOF"); + decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", new_token_bucket}}); + EXPECT_TRUE( + callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, {}, "").ok()); + // The rate limiter should be working with the new config. + // time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(4))); + expectWritesAndLog(log1, /*expect_write_times=*/3, /*log_call_times=*/4); + + // A new log instance should also pick up the re-added config. + AccessLog::InstanceSharedPtr log2 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + // It shares the same token bucket, so it's rate limited. + expectWritesAndLog(log2, /*expect_write_times=*/0, /*log_call_times=*/1); + + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(4))); + expectWritesAndLog(log2, /*expect_write_times=*/3, /*log_call_times=*/4); +} + +TEST_F(AccessLogImplTestWithRateLimitFilter, + RemoveResourceAndGetTokenBucketBeforeNewResourceAdded) { + AccessLog::InstanceSharedPtr log1 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + ASSERT_EQ(subscriptions_.size(), 1); + ASSERT_EQ(callbackss_.size(), 1); + + // 1. Add the resource + EXPECT_CALL(init_watcher_, ready()); + auto decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", token_bucket_resource_}}); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, "").ok()); + expectWritesAndLog(log1, /*expect_write_times=*/1, /*log_call_times=*/2); + + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(1))); + + // 2. Remove the token bucket. + Protobuf::RepeatedPtrField removed_resources; + removed_resources.Add("token_bucket_name"); + EXPECT_TRUE(callbackss_["token_bucket_name"]->onConfigUpdate({}, removed_resources, "").ok()); + // The rate limiter should always deny. + expectWritesAndLog(log1, /*expect_write_times=*/0, /*log_call_times=*/1); + + // A new log instance should also pick up the re-added config. + EXPECT_CALL(init_watcher_, ready()).Times(0); + AccessLog::InstanceSharedPtr log2 = AccessLog::AccessLogFactory::fromProto( + parseAccessLogFromV3Yaml(default_access_log_), context_); + context_.server_factory_context_.init_manager_.initialize(init_watcher_); + + // 3. Add the resource back. + EXPECT_CALL(init_watcher_, ready()); + auto new_token_bucket = TestUtility::parseYaml(R"EOF( +max_tokens: 3 +tokens_per_fill: 3 +fill_interval: + seconds: 3 +)EOF"); + + decoded_resources = TestUtility::decodeResources( + {{"token_bucket_name", new_token_bucket}}); + EXPECT_TRUE( + callbackss_["token_bucket_name"]->onConfigUpdate(decoded_resources.refvec_, {}, "").ok()); + // The rate limiter should be working with the new config. + expectWritesAndLog(log1, /*expect_write_times=*/3, /*log_call_times=*/4); + // It shares the same token bucket, so it's rate limited. + expectWritesAndLog(log2, /*expect_write_times=*/0, /*log_call_times=*/1); + + time_system_->setMonotonicTime(MonotonicTime(std::chrono::seconds(4))); + expectWritesAndLog(log2, /*expect_write_times=*/3, /*log_call_times=*/4); +} + +} // namespace +} // namespace ProcessRateLimit +} // namespace Filters +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/access_loggers/filters/process_ratelimit/integration_test.cc b/test/extensions/access_loggers/filters/process_ratelimit/integration_test.cc new file mode 100644 index 0000000000000..9cc0d28427af3 --- /dev/null +++ b/test/extensions/access_loggers/filters/process_ratelimit/integration_test.cc @@ -0,0 +1,119 @@ +#include "envoy/config/accesslog/v3/accesslog.pb.validate.h" +#include "envoy/extensions/access_loggers/file/v3/file.pb.h" +#include "envoy/extensions/access_loggers/filters/process_ratelimit/v3/process_ratelimit.pb.h" +#include "envoy/type/v3/token_bucket.pb.h" + +#include "source/common/protobuf/protobuf.h" + +#include "test/integration/http_integration.h" +#include "test/test_common/environment.h" +#include "test/test_common/registry.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" + +using testing::HasSubstr; + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace Filters { +namespace ProcessRateLimit { +namespace { + +class AccessLogIntegrationTest : public testing::TestWithParam, + public HttpIntegrationTest { +public: + AccessLogIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP1, GetParam()) {} +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, AccessLogIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +envoy::config::accesslog::v3::AccessLog parseAccessLogFromV3Yaml(const std::string& yaml) { + envoy::config::accesslog::v3::AccessLog access_log; + TestUtility::loadFromYamlAndValidate(yaml, access_log); + return access_log; +} + +TEST_P(AccessLogIntegrationTest, AccessLogLocalRateLimitFilter) { + const std::string token_bucket_path = TestEnvironment::temporaryPath(fmt::format( + "token_bucket_{}_{}.yaml", version_ == Network::Address::IpVersion::v4 ? "v4" : "v6", + TestUtility::uniqueFilename())); + TestEnvironment::writeStringToFileForTest(token_bucket_path, R"EOF( +version_info: "123" +resources: +- "@type": type.googleapis.com/envoy.service.discovery.v3.Resource + name: "token_bucket_name" + version: "100" + resource: + "@type": type.googleapis.com/envoy.type.v3.TokenBucket + max_tokens: 3 + tokens_per_fill: 1 + fill_interval: + seconds: 1 +)EOF", + true); + + const std::string access_log_path = TestEnvironment::temporaryPath( + fmt::format("access_log_{}_{}.txt", version_ == Network::Address::IpVersion::v4 ? "v4" : "v6", + TestUtility::uniqueFilename())); + + config_helper_.addConfigModifier( + [token_bucket_path, access_log_path]( + envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) { + const std::string access_log_yaml = fmt::format(R"EOF( +name: accesslog +filter: + extension_filter: + name: local_ratelimit_extension_filter + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.filters.process_ratelimit.v3.ProcessRateLimitFilter + dynamic_config: + resource_name: "token_bucket_name" + config_source: + path_config_source: + path: "{}" + resource_api_version: V3 +typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "{}" +)EOF", + token_bucket_path, access_log_path); + auto* access_log1 = hcm.add_access_log(); + *access_log1 = parseAccessLogFromV3Yaml(access_log_yaml); + auto* access_log2 = hcm.add_access_log(); + *access_log2 = parseAccessLogFromV3Yaml(access_log_yaml); + }); + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0, 0); + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0, 0); + cleanupUpstreamAndDownstream(); + + auto entries = waitForAccessLogEntries(access_log_path, nullptr); + // We have 4 access logs triggered but 1 got rate limited. + EXPECT_EQ(3, entries.size()); + + timeSystem().advanceTimeWait(std::chrono::seconds(2)); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0, 0); + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0, 0); + cleanupUpstreamAndDownstream(); + entries = waitForAccessLogEntries(access_log_path, nullptr); + // We have another 4 access logs triggered but 1 got rate limited. + EXPECT_EQ(6, entries.size()); +} + +} // namespace +} // namespace ProcessRateLimit +} // namespace Filters +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc b/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc index 637ee43e6ca6a..80cdf64605c9c 100644 --- a/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc +++ b/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc @@ -425,7 +425,7 @@ TEST_F(LocalRateLimiterImplTest, AtomicTokenBucketMultipleTokensPerFillWithShare // Verify token bucket functionality with max tokens > tokens per fill. TEST_F(LocalRateLimiterImplTest, AtomicTokenBucketMaxTokensGreaterThanTokensPerFill) { - initializeWithAtomicTokenBucket(std::chrono::milliseconds(200), 2, 1); + initializeWithAtomicTokenBucket(std::chrono::milliseconds(200), 2, 1, nullptr); // 2 -> 0 tokens EXPECT_TRUE(rate_limiter_->requestAllowed(route_descriptors_).allowed); diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index 22ff417048e5e..6e88fd0c68e88 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -535,25 +535,19 @@ void BaseIntegrationTest::useListenerAccessLog(absl::string_view format) { listener_access_log_name_ = TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); ASSERT_TRUE(config_helper_.setListenerAccessLog(listener_access_log_name_, format)); } - -std::string BaseIntegrationTest::waitForAccessLog(const std::string& filename, uint32_t entry, - bool allow_excess_entries, - Network::ClientConnection* client_connection) { - +std::vector +BaseIntegrationTest::waitForAccessLogEntries(const std::string& filename, + Network::ClientConnection* client_connection, + absl::optional min_entries) { // Wait a max of 1s for logs to flush to disk. std::string contents; + std::vector entries; const int num_iterations = TIMEOUT_FACTOR * 1000; for (int i = 0; i < num_iterations; ++i) { contents = TestEnvironment::readFileToStringForTest(filename); - std::vector entries = absl::StrSplit(contents, '\n', absl::SkipEmpty()); - if (entries.size() >= entry + 1) { - // Often test authors will waitForAccessLog() for multiple requests, and - // not increment the entry number for the second wait. Guard against that. - EXPECT_TRUE(allow_excess_entries || entries.size() == entry + 1) - << "Waiting for entry index " << entry << " but it was not the last entry as there were " - << entries.size() << "\n" - << contents; - return entries[entry]; + entries = absl::StrSplit(contents, '\n', absl::SkipEmpty()); + if (min_entries.has_value() && entries.size() >= min_entries.value()) { + return entries; } if (i % 25 == 0 && client_connection != nullptr) { // The QUIC default delayed ack timer is 25ms. Wait for any pending ack timers to expire, @@ -562,8 +556,26 @@ std::string BaseIntegrationTest::waitForAccessLog(const std::string& filename, u } absl::SleepFor(absl::Milliseconds(1)); } - RELEASE_ASSERT(0, absl::StrCat("Timed out waiting for access log. Found: '", contents, "'")); - return ""; + if (min_entries.has_value()) { + RELEASE_ASSERT(0, absl::StrCat("Timed out waiting for access log. Found: '", contents, "'")); + } + return entries; +} + +std::string BaseIntegrationTest::waitForAccessLog(const std::string& filename, uint32_t entry, + bool allow_excess_entries, + Network::ClientConnection* client_connection) { + std::vector entries = + waitForAccessLogEntries(filename, client_connection, entry + 1); + + // Often test authors will waitForAccessLog() for multiple requests, and + // not increment the entry number for the second wait. Guard against that. + EXPECT_TRUE(allow_excess_entries || entries.size() == entry + 1) + << "Waiting for entry index " << entry << " but it was not the last entry as there were " + << entries.size() << "\n" + << absl::StrJoin(entries, "\n"); + RELEASE_ASSERT(entries.size() > entry, absl::StrCat("Log entry ", entry, " not found.")); + return entries[entry]; } void BaseIntegrationTest::createXdsUpstream() { diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index 0d54d3b6cf090..ff412d78bc4bb 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -154,6 +154,15 @@ class BaseIntegrationTest : protected Logger::Loggable { // Enable the listener access log void useListenerAccessLog(absl::string_view format = ""); + + // Returns all log entries after the nth access log entry, defaulting to log entry 0. + // By default will trigger an expect failure if more than one entry is returned. + // If client_connection is provided, flush pending acks to enable deferred logging. + std::vector + waitForAccessLogEntries(const std::string& filename, + Network::ClientConnection* client_connection = nullptr, + absl::optional min_entries = std::nullopt); + // Returns all log entries after the nth access log entry, defaulting to log entry 0. // By default will trigger an expect failure if more than one entry is returned. // If client_connection is provided, flush pending acks to enable deferred logging. diff --git a/test/test_common/utility.h b/test/test_common/utility.h index 8192ae549d94d..75d2b0268519c 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -699,6 +699,19 @@ class TestUtility { return decoded_resources; } + template + static Config::DecodedResourcesWrapper + decodeResources(absl::flat_hash_map resources) { + Config::DecodedResourcesWrapper decoded_resources; + for (const auto& [name, resource] : resources) { + auto owned_resource = std::make_unique(resource); + decoded_resources.owned_resources_.emplace_back( + new Config::DecodedResourceImpl(std::move(owned_resource), name, {}, "")); + decoded_resources.refvec_.emplace_back(*decoded_resources.owned_resources_.back()); + } + return decoded_resources; + } + template static Config::DecodedResourcesWrapper decodeResources(const Protobuf::RepeatedPtrField& resources,