From 7fcfe79e9244ae57fbe70212325c131f1ec66c87 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 6 Mar 2019 10:30:51 -0500 Subject: [PATCH] config: implement incremental CDS in Envoy Signed-off-by: Fred Douglas --- api/XDS_PROTOCOL.md | 42 ++- api/envoy/api/v2/cds.proto | 3 +- api/envoy/api/v2/core/config_source.proto | 7 + api/envoy/api/v2/discovery.proto | 60 ++-- api/envoy/api/v2/rds.proto | 3 +- api/envoy/service/discovery/v2/ads.proto | 4 +- include/envoy/config/BUILD | 1 + include/envoy/config/subscription.h | 17 + source/common/config/BUILD | 17 + .../common/config/delta_subscription_impl.h | 213 ++++++++++++ source/common/config/grpc_stream.h | 10 +- source/common/config/subscription_factory.h | 15 +- source/common/config/utility.cc | 23 +- source/common/router/rds_impl.h | 5 + source/common/secret/sds_api.h | 5 + .../common/stats/stat_data_allocator_impl.h | 2 +- source/common/upstream/cds_api_impl.cc | 60 ++-- source/common/upstream/cds_api_impl.h | 7 +- source/common/upstream/cluster_manager_impl.h | 1 + source/common/upstream/eds.h | 5 + source/server/lds_api.h | 5 + source/server/server.cc | 1 - .../config/config_provider_impl_test.cc | 5 + .../config/subscription_factory_test.cc | 6 +- test/common/config/utility_test.cc | 16 +- test/common/upstream/cds_api_impl_test.cc | 35 +- test/integration/BUILD | 21 ++ .../integration/delta_cds_integration_test.cc | 324 ++++++++++++++++++ .../delta_xds_integration_test_base.cc | 91 +++++ .../delta_xds_integration_test_base.h | 47 +++ test/integration/hds_integration_test.cc | 1 - test/integration/http_integration.cc | 8 +- test/integration/http_integration.h | 6 +- test/integration/integration.cc | 47 +++ test/integration/integration.h | 23 ++ test/mocks/config/mocks.h | 5 + tools/spelling_dictionary.txt | 2 + 37 files changed, 1016 insertions(+), 127 deletions(-) create mode 100644 source/common/config/delta_subscription_impl.h create mode 100644 test/integration/delta_cds_integration_test.cc create mode 100644 test/integration/delta_xds_integration_test_base.cc create mode 100644 test/integration/delta_xds_integration_test_base.h diff --git a/api/XDS_PROTOCOL.md b/api/XDS_PROTOCOL.md index fe0362ef16f18..c7587b2f283ce 100644 --- a/api/XDS_PROTOCOL.md +++ b/api/XDS_PROTOCOL.md @@ -288,35 +288,33 @@ admin: ### Incremental xDS -Incremental xDS is a separate xDS endpoint available for ADS, CDS and RDS that -allows: - - * Incremental updates of the list of tracked resources by the xDS client. - This supports Envoy on-demand / lazily requesting additional resources. For - example, this may occur when a request corresponding to an unknown cluster - arrives. - * The xDS server can incrementally update the resources on the client. - This supports the goal of scalability of xDS resources. Rather than deliver - all 100k clusters when a single cluster is modified, the management server - only needs to deliver the single cluster that changed. - -An xDS incremental session is always in the context of a gRPC bidirectional +Incremental xDS is a separate xDS endpoint that: + + * Allows the protocol to communicate on the wire in terms of resource/resource + name deltas ("Delta xDS"). This supports the goal of scalability of xDS + resources. Rather than deliver all 100k clusters when a single cluster is + modified, the management server only needs to deliver the single cluster + that changed. + * Allows the Envoy to on-demand / lazily request additional resources. For + example, requesting a cluster only when a request for that cluster arrives. + +An Incremental xDS session is always in the context of a gRPC bidirectional stream. This allows the xDS server to keep track of the state of xDS clients -connected to it. There is no REST version of Incremental xDS. +connected to it. There is no REST version of Incremental xDS yet. -In incremental xDS the nonce field is required and used to pair a -[`IncrementalDiscoveryResponse`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/discovery.proto#discoveryrequest) -to a [`IncrementalDiscoveryRequest`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/discovery.proto#discoveryrequest) +In the delta xDS wire protocol, the nonce field is required and used to pair a +[`DeltaDiscoveryResponse`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/discovery.proto#deltadiscoveryresponse) +to a [`DeltaDiscoveryRequest`](https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/discovery.proto#deltadiscoveryrequest) ACK or NACK. Optionally, a response message level system_version_info is present for debugging purposes only. -`IncrementalDiscoveryRequest` can be sent in 3 situations: +`DeltaDiscoveryRequest` can be sent in 3 situations: 1. Initial message in a xDS bidirectional gRPC stream. - 2. As an ACK or NACK response to a previous `IncrementalDiscoveryResponse`. + 2. As an ACK or NACK response to a previous `DeltaDiscoveryResponse`. In this case the `response_nonce` is set to the nonce value in the Response. ACK or NACK is determined by the absence or presence of `error_detail`. - 3. Spontaneous `IncrementalDiscoveryRequest` from the client. + 3. Spontaneous `DeltaDiscoveryRequest` from the client. This can be done to dynamically add or remove elements from the tracked `resource_names` set. In this case `response_nonce` must be omitted. @@ -326,8 +324,8 @@ client spontaneously requests the "wc" resource. ![Incremental session example](diagrams/incremental.svg) -On reconnect the xDS Incremental client may tell the server of its known resources -to avoid resending them over the network. +On reconnect the Incremental xDS client may tell the server of its known +resources to avoid resending them over the network. ![Incremental reconnect example](diagrams/incremental-reconnect.svg) diff --git a/api/envoy/api/v2/cds.proto b/api/envoy/api/v2/cds.proto index 28fa6aaba9f2b..30f1f15e93004 100644 --- a/api/envoy/api/v2/cds.proto +++ b/api/envoy/api/v2/cds.proto @@ -36,8 +36,7 @@ service ClusterDiscoveryService { rpc StreamClusters(stream DiscoveryRequest) returns (stream DiscoveryResponse) { } - rpc IncrementalClusters(stream IncrementalDiscoveryRequest) - returns (stream IncrementalDiscoveryResponse) { + rpc DeltaClusters(stream DeltaDiscoveryRequest) returns (stream DeltaDiscoveryResponse) { } rpc FetchClusters(DiscoveryRequest) returns (DiscoveryResponse) { diff --git a/api/envoy/api/v2/core/config_source.proto b/api/envoy/api/v2/core/config_source.proto index 3be59c1886b5f..0b5a1ec4bf9e5 100644 --- a/api/envoy/api/v2/core/config_source.proto +++ b/api/envoy/api/v2/core/config_source.proto @@ -32,6 +32,13 @@ message ApiConfigSource { REST = 1; // gRPC v2 API. GRPC = 2; + // Using the delta xDS gRPC service, i.e. DeltaDiscovery{Request,Response} + // rather than Discovery{Request,Response}. Rather than sending Envoy the entire state + // with every update, the xDS server only sends what has changed since the last update. + // + // DELTA_GRPC is not yet entirely implemented! Initially, only CDS is available. + // Do not use for other xDSes. TODO(fredlas) update/remove this warning when appropriate. + DELTA_GRPC = 3; } ApiType api_type = 1 [(validate.rules).enum.defined_only = true]; // Cluster names should be used only with REST. If > 1 diff --git a/api/envoy/api/v2/discovery.proto b/api/envoy/api/v2/discovery.proto index ff04dc20f8b41..5bff0ddae2ee3 100644 --- a/api/envoy/api/v2/discovery.proto +++ b/api/envoy/api/v2/discovery.proto @@ -102,33 +102,32 @@ message DiscoveryResponse { core.ControlPlane control_plane = 6; } -// IncrementalDiscoveryRequest and IncrementalDiscoveryResponse are used in a -// new gRPC endpoint for Incremental xDS. The feature is not supported for REST -// management servers. +// DeltaDiscoveryRequest and DeltaDiscoveryResponse are used in a new gRPC +// endpoint for Delta xDS. // -// With Incremental xDS, the IncrementalDiscoveryResponses do not need to -// include a full snapshot of the tracked resources. Instead -// IncrementalDiscoveryResponses are a diff to the state of a xDS client. -// In Incremental XDS there are per resource versions which allows to track -// state at the resource granularity. -// An xDS Incremental session is always in the context of a gRPC bidirectional +// With Delta xDS, the DeltaDiscoveryResponses do not need to include a full +// snapshot of the tracked resources. Instead, DeltaDiscoveryResponses are a +// diff to the state of a xDS client. +// In Delta XDS there are per resource versions, which allow tracking state at +// the resource granularity. +// An xDS Delta session is always in the context of a gRPC bidirectional // stream. This allows the xDS server to keep track of the state of xDS clients // connected to it. // -// In Incremental xDS the nonce field is required and used to pair -// IncrementalDiscoveryResponse to a IncrementalDiscoveryRequest ACK or NACK. +// In Delta xDS the nonce field is required and used to pair +// DeltaDiscoveryResponse to a DeltaDiscoveryRequest ACK or NACK. // Optionally, a response message level system_version_info is present for // debugging purposes only. // -// IncrementalDiscoveryRequest can be sent in 3 situations: +// DeltaDiscoveryRequest can be sent in 3 situations: // 1. Initial message in a xDS bidirectional gRPC stream. -// 2. As a ACK or NACK response to a previous IncrementalDiscoveryResponse. +// 2. As a ACK or NACK response to a previous DeltaDiscoveryResponse. // In this case the response_nonce is set to the nonce value in the Response. // ACK or NACK is determined by the absence or presence of error_detail. -// 3. Spontaneous IncrementalDiscoveryRequest from the client. +// 3. Spontaneous DeltaDiscoveryRequest from the client. // This can be done to dynamically add or remove elements from the tracked // resource_names set. In this case response_nonce must be omitted. -message IncrementalDiscoveryRequest { +message DeltaDiscoveryRequest { // The node making the request. core.Node node = 1; @@ -138,18 +137,18 @@ message IncrementalDiscoveryRequest { // required for ADS. string type_url = 2; - // IncrementalDiscoveryRequests allow the client to add or remove individual + // DeltaDiscoveryRequests allow the client to add or remove individual // resources to the set of tracked resources in the context of a stream. // All resource names in the resource_names_subscribe list are added to the // set of tracked resources and all resource names in the resource_names_unsubscribe // list are removed from the set of tracked resources. - // Unlike in non incremental xDS, an empty resource_names_subscribe or + // Unlike in state-of-the-world xDS, an empty resource_names_subscribe or // resource_names_unsubscribe list simply means that no resources are to be // added or removed to the resource list. // The xDS server must send updates for all tracked resources but can also // send updates for resources the client has not subscribed to. This behavior - // is similar to non incremental xDS. - // These two fields can be set for all types of IncrementalDiscoveryRequests + // is similar to state-of-the-world xDS. + // These two fields can be set for all types of DeltaDiscoveryRequests // (initial, ACK/NACK or spontaneous). // // A list of Resource names to add to the list of tracked resources. @@ -158,15 +157,17 @@ message IncrementalDiscoveryRequest { // A list of Resource names to remove from the list of tracked resources. repeated string resource_names_unsubscribe = 4; - // This map must be populated when the IncrementalDiscoveryRequest is the - // first in a stream. The keys are the resources names of the xDS resources + // This map must be populated when the DeltaDiscoveryRequest is the + // first in a stream (assuming there are any resources - this field's purpose is to enable + // a session to continue in a reconnected gRPC stream, and so will not be used in the very + // first stream of a session). The keys are the resources names of the xDS resources // known to the xDS client. The values in the map are the associated resource // level version info. map initial_resource_versions = 5; - // When the IncrementalDiscoveryRequest is a ACK or NACK message in response - // to a previous IncrementalDiscoveryResponse, the response_nonce must be the - // nonce in the IncrementalDiscoveryResponse. + // When the DeltaDiscoveryRequest is a ACK or NACK message in response + // to a previous DeltaDiscoveryResponse, the response_nonce must be the + // nonce in the DeltaDiscoveryResponse. // Otherwise response_nonce must be omitted. string response_nonce = 6; @@ -176,24 +177,27 @@ message IncrementalDiscoveryRequest { google.rpc.Status error_detail = 7; } -message IncrementalDiscoveryResponse { +message DeltaDiscoveryResponse { // The version of the response data (used for debugging). string system_version_info = 1; // The response resources. These are typed resources that match the type url - // in the IncrementalDiscoveryRequest. + // in the DeltaDiscoveryRequest. repeated Resource resources = 2 [(gogoproto.nullable) = false]; // Resources names of resources that have be deleted and to be removed from the xDS Client. // Removed resources for missing resources can be ignored. repeated string removed_resources = 6; - // The nonce provides a way for IncrementalDiscoveryRequests to uniquely - // reference a IncrementalDiscoveryResponse. The nonce is required. + // The nonce provides a way for DeltaDiscoveryRequests to uniquely + // reference a DeltaDiscoveryResponse. The nonce is required. string nonce = 5; } message Resource { + // The resource's name, to distinguish it from others of the same type of resource. + string name = 3; + // The resource level version. It allows xDS to track the state of individual // resources. string version = 1; diff --git a/api/envoy/api/v2/rds.proto b/api/envoy/api/v2/rds.proto index 8d41b384ba9bd..d75b68af6791f 100644 --- a/api/envoy/api/v2/rds.proto +++ b/api/envoy/api/v2/rds.proto @@ -33,8 +33,7 @@ service RouteDiscoveryService { rpc StreamRoutes(stream DiscoveryRequest) returns (stream DiscoveryResponse) { } - rpc IncrementalRoutes(stream IncrementalDiscoveryRequest) - returns (stream IncrementalDiscoveryResponse) { + rpc DeltaRoutes(stream DeltaDiscoveryRequest) returns (stream DeltaDiscoveryResponse) { } rpc FetchRoutes(DiscoveryRequest) returns (DiscoveryResponse) { diff --git a/api/envoy/service/discovery/v2/ads.proto b/api/envoy/service/discovery/v2/ads.proto index 73f272191bd01..6a9d044ab4bdd 100644 --- a/api/envoy/service/discovery/v2/ads.proto +++ b/api/envoy/service/discovery/v2/ads.proto @@ -32,7 +32,7 @@ service AggregatedDiscoveryService { returns (stream envoy.api.v2.DiscoveryResponse) { } - rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest) - returns (stream envoy.api.v2.IncrementalDiscoveryResponse) { + rpc DeltaAggregatedResources(stream envoy.api.v2.DeltaDiscoveryRequest) + returns (stream envoy.api.v2.DeltaDiscoveryResponse) { } } diff --git a/include/envoy/config/BUILD b/include/envoy/config/BUILD index 5c41821d10c80..4696d029075fc 100644 --- a/include/envoy/config/BUILD +++ b/include/envoy/config/BUILD @@ -23,6 +23,7 @@ envoy_cc_library( deps = [ "//include/envoy/stats:stats_macros", "//source/common/protobuf", + "@envoy_api//envoy/api/v2:discovery_cc", ], ) diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index 6d103c9c01e1d..b81d769585fb7 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -3,6 +3,7 @@ #include #include +#include "envoy/api/v2/discovery.pb.h" #include "envoy/common/exception.h" #include "envoy/common/pure.h" #include "envoy/stats/stats_macros.h" @@ -29,6 +30,22 @@ template class SubscriptionCallbacks { virtual void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) PURE; + // TODO(fredlas) it is a HACK that there are two of these. After delta CDS is merged, + // I intend to reimplement all state-of-the-world xDSes' use of onConfigUpdate + // in terms of this delta-style one (and remove the original). + /** + * Called when a delta configuration update is received. + * @param added_resources resources newly added since the previous fetch. + * @param removed_resources names of resources that this fetch instructed to be removed. + * @param system_version_info aggregate response data "version", for debugging. + * @throw EnvoyException with reason if the config changes are rejected. Otherwise the changes + * are accepted. Accepted changes have their version_info reflected in subsequent requests. + */ + virtual void + onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) PURE; + /** * Called when either the Subscription is unable to fetch a config update or when onConfigUpdate * invokes an exception. diff --git a/source/common/config/BUILD b/source/common/config/BUILD index 4737db10e65eb..8071c1855276b 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -97,6 +97,22 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "delta_subscription_lib", + hdrs = ["delta_subscription_impl.h"], + deps = [ + ":grpc_stream_lib", + ":utility_lib", + "//include/envoy/config:subscription_interface", + "//include/envoy/grpc:async_client_interface", + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:backoff_lib", + "//source/common/common:minimal_logger_lib", + "//source/common/common:token_bucket_impl_lib", + "//source/common/protobuf", + ], +) + envoy_cc_library( name = "grpc_stream_lib", hdrs = ["grpc_stream.h"], @@ -303,6 +319,7 @@ envoy_cc_library( name = "subscription_factory_lib", hdrs = ["subscription_factory.h"], deps = [ + ":delta_subscription_lib", ":filesystem_subscription_lib", ":grpc_mux_subscription_lib", ":grpc_subscription_lib", diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h new file mode 100644 index 0000000000000..5164d284b6a4a --- /dev/null +++ b/source/common/config/delta_subscription_impl.h @@ -0,0 +1,213 @@ +#pragma once + +#include + +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/common/token_bucket.h" +#include "envoy/config/subscription.h" + +#include "common/common/assert.h" +#include "common/common/backoff_strategy.h" +#include "common/common/logger.h" +#include "common/common/token_bucket_impl.h" +#include "common/config/grpc_stream.h" +#include "common/config/utility.h" +#include "common/grpc/common.h" +#include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" + +namespace Envoy { +namespace Config { + +struct ResourceNameDiff { + std::vector added_; + std::vector removed_; +}; + +const char EmptyVersion[] = ""; + +/** + * Manages the logic of a (non-aggregated) delta xDS subscription. + * TODO(fredlas) add aggregation support. + */ +template +class DeltaSubscriptionImpl + : public Subscription, + public GrpcStream { +public: + DeltaSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client, + Event::Dispatcher& dispatcher, + const Protobuf::MethodDescriptor& service_method, + Runtime::RandomGenerator& random, Stats::Scope& scope, + const RateLimitSettings& rate_limit_settings, SubscriptionStats stats) + : GrpcStream(std::move(async_client), service_method, random, dispatcher, + scope, rate_limit_settings), + type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())), + local_info_(local_info), stats_(stats) { + request_.set_type_url(type_url_); + request_.mutable_node()->MergeFrom(local_info_.node()); + } + + // Enqueues and attempts to send a discovery request, (un)subscribing to resources missing from / + // added to the passed 'resources' argument, relative to resources_. Updates resources_ to + // 'resources'. + void buildAndQueueDiscoveryRequest(const std::vector& resources) { + ResourceNameDiff diff; + std::set_difference(resources.begin(), resources.end(), resource_names_.begin(), + resource_names_.end(), std::inserter(diff.added_, diff.added_.begin())); + std::set_difference(resource_names_.begin(), resource_names_.end(), resources.begin(), + resources.end(), std::inserter(diff.removed_, diff.removed_.begin())); + + for (const auto& added : diff.added_) { + resources_[added] = EmptyVersion; + resource_names_.insert(added); + } + for (const auto& removed : diff.removed_) { + resources_.erase(removed); + resource_names_.erase(removed); + } + queueDiscoveryRequest(diff); + } + + void sendDiscoveryRequest(const ResourceNameDiff& diff) override { + if (!grpcStreamAvailable()) { + ENVOY_LOG(debug, "No stream available to sendDiscoveryRequest for {}", type_url_); + return; // Drop this request; the reconnect will enqueue a new one. + } + if (paused_) { + ENVOY_LOG(trace, "API {} paused during sendDiscoveryRequest().", type_url_); + pending_ = diff; + return; // The unpause will send this request. + } + + request_.clear_resource_names_subscribe(); + request_.clear_resource_names_unsubscribe(); + std::copy(diff.added_.begin(), diff.added_.end(), + Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe())); + std::copy(diff.removed_.begin(), diff.removed_.end(), + Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe())); + + ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString()); + sendMessage(request_); + request_.clear_error_detail(); + request_.clear_initial_resource_versions(); + } + + void subscribe(const std::vector& resources) { + ENVOY_LOG(debug, "delta subscribe for " + type_url_); + buildAndQueueDiscoveryRequest(resources); + } + + void pause() { + ENVOY_LOG(debug, "Pausing discovery requests for {}", type_url_); + ASSERT(!paused_); + paused_ = true; + } + + void resume() { + ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url_); + ASSERT(paused_); + paused_ = false; + if (pending_.has_value()) { + queueDiscoveryRequest(pending_.value()); + pending_.reset(); + } + } + + // Config::SubscriptionCallbacks + void onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& version_info) { + callbacks_->onConfigUpdate(added_resources, removed_resources, version_info); + for (const auto& resource : added_resources) { + resources_[resource.name()] = resource.version(); + } + stats_.update_success_.inc(); + stats_.update_attempt_.inc(); + stats_.version_.set(HashUtil::xxHash64(version_info)); + ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_, + added_resources.size(), removed_resources.size()); + } + + void handleResponse(std::unique_ptr&& message) override { + ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url_, + message->system_version_info()); + + request_.set_response_nonce(message->nonce()); + + try { + onConfigUpdate(message->resources(), message->removed_resources(), + message->system_version_info()); + } catch (const EnvoyException& e) { + stats_.update_rejected_.inc(); + ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what()); + stats_.update_attempt_.inc(); + callbacks_->onConfigUpdateFailed(&e); + ::google::rpc::Status* error_detail = request_.mutable_error_detail(); + error_detail->set_code(Grpc::Status::GrpcStatus::Internal); + error_detail->set_message(e.what()); + } + queueDiscoveryRequest(ResourceNameDiff()); // no change to subscribed resources + } + + void handleStreamEstablished() override { + // initial_resource_versions "must be populated for first request in a stream", so guarantee + // that the initial version'd request we're about to enqueue is what gets sent. + clearRequestQueue(); + + request_.Clear(); + for (auto const& resource : resources_) { + (*request_.mutable_initial_resource_versions())[resource.first] = resource.second; + } + request_.set_type_url(type_url_); + request_.mutable_node()->MergeFrom(local_info_.node()); + queueDiscoveryRequest(ResourceNameDiff()); // no change to subscribed resources + } + + void handleEstablishmentFailure() override { + stats_.update_failure_.inc(); + ENVOY_LOG(debug, "delta update for {} failed", type_url_); + stats_.update_attempt_.inc(); + callbacks_->onConfigUpdateFailed(nullptr); + } + + // Config::DeltaSubscription + void start(const std::vector& resources, + SubscriptionCallbacks& callbacks) override { + callbacks_ = &callbacks; + establishNewStream(); + subscribe(resources); + // The attempt stat here is maintained for the purposes of having consistency between ADS and + // individual DeltaSubscriptions. Since ADS is push based and muxed, the notion of an + // "attempt" for a given xDS API combined by ADS is not really that meaningful. + stats_.update_attempt_.inc(); + } + + void updateResources(const std::vector& resources) override { + subscribe(resources); + stats_.update_attempt_.inc(); + } + +private: + // A map from resource name to per-resource version. + std::unordered_map resources_; + // The keys of resources_. Only tracked separately because std::map does not provide an iterator + // into just its keys, e.g. for use in std::set_difference. + std::unordered_set resource_names_; + const std::string type_url_; + SubscriptionCallbacks* callbacks_{}; + // In-flight or previously sent request. + envoy::api::v2::DeltaDiscoveryRequest request_; + // Paused via pause()? + bool paused_{}; + absl::optional pending_; + + const LocalInfo::LocalInfo& local_info_; + + SubscriptionStats stats_; +}; + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/grpc_stream.h b/source/common/config/grpc_stream.h index 128d2fcab0ec6..2d03e4e88c621 100644 --- a/source/common/config/grpc_stream.h +++ b/source/common/config/grpc_stream.h @@ -12,7 +12,7 @@ namespace Envoy { namespace Config { -// Oversees communication for gRPC xDS implementations (parent to both regular xDS and incremental +// Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta // xDS variants). Reestablishes the gRPC channel when necessary, and provides rate limiting of // requests. template @@ -49,6 +49,14 @@ class GrpcStream : public Grpc::TypedAsyncStreamCallbacks, drainRequests(); } + void clearRequestQueue() { + control_plane_stats_.pending_requests_.sub(request_queue_.size()); + // TODO(fredlas) when we have C++17: request_queue_ = {}; + while (!request_queue_.empty()) { + request_queue_.pop(); + } + } + void establishNewStream() { ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString()); stream_ = async_client_->start(service_method_, *this); diff --git a/source/common/config/subscription_factory.h b/source/common/config/subscription_factory.h index ce6a1e8d69a51..4362291ab7a60 100644 --- a/source/common/config/subscription_factory.h +++ b/source/common/config/subscription_factory.h @@ -8,6 +8,7 @@ #include "envoy/stats/scope.h" #include "envoy/upstream/cluster_manager.h" +#include "common/config/delta_subscription_impl.h" #include "common/config/filesystem_subscription_impl.h" #include "common/config/grpc_mux_subscription_impl.h" #include "common/config/grpc_subscription_impl.h" @@ -67,16 +68,26 @@ class SubscriptionFactory { Utility::apiConfigSourceRequestTimeout(api_config_source), *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats)); break; - case envoy::api::v2::core::ApiConfigSource::GRPC: { + case envoy::api::v2::core::ApiConfigSource::GRPC: result.reset(new GrpcSubscriptionImpl( local_info, Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(), - config.api_config_source(), scope) + api_config_source, scope) ->create(), dispatcher, random, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats, scope, Utility::parseRateLimitSettings(api_config_source))); break; + case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: { + Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source); + result.reset(new DeltaSubscriptionImpl( + local_info, + Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(), + api_config_source, scope) + ->create(), + dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), + random, scope, Utility::parseRateLimitSettings(api_config_source), stats)); + break; } default: NOT_REACHED_GCOVR_EXCL_LINE; diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index cfe2fdbc06af3..d8b11b71d97f6 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -89,7 +89,8 @@ void Utility::checkFilesystemSubscriptionBackingPath(const std::string& path, Ap void Utility::checkApiConfigSourceNames( const envoy::api::v2::core::ApiConfigSource& api_config_source) { const bool is_grpc = - (api_config_source.api_type() == envoy::api::v2::core::ApiConfigSource::GRPC); + (api_config_source.api_type() == envoy::api::v2::core::ApiConfigSource::GRPC || + api_config_source.api_type() == envoy::api::v2::core::ApiConfigSource::DELTA_GRPC); if (api_config_source.cluster_names().empty() && api_config_source.grpc_services().empty()) { throw EnvoyException( @@ -99,19 +100,19 @@ void Utility::checkApiConfigSourceNames( if (is_grpc) { if (!api_config_source.cluster_names().empty()) { - throw EnvoyException(fmt::format( - "envoy::api::v2::core::ConfigSource::GRPC must not have a cluster name specified: {}", - api_config_source.DebugString())); + throw EnvoyException(fmt::format("envoy::api::v2::core::ConfigSource::(DELTA_)GRPC " + "must not have a cluster name specified: {}", + api_config_source.DebugString())); } if (api_config_source.grpc_services().size() > 1) { - throw EnvoyException(fmt::format( - "envoy::api::v2::core::ConfigSource::GRPC must have a single gRPC service specified: {}", - api_config_source.DebugString())); + throw EnvoyException(fmt::format("envoy::api::v2::core::ConfigSource::(DELTA_)GRPC " + "must have a single gRPC service specified: {}", + api_config_source.DebugString())); } } else { if (!api_config_source.grpc_services().empty()) { throw EnvoyException( - fmt::format("envoy::api::v2::core::ConfigSource, if not of type gRPC, must not have " + fmt::format("envoy::api::v2::core::ConfigSource, if not a gRPC type, must not have " "a gRPC service specified: {}", api_config_source.DebugString())); } @@ -126,6 +127,7 @@ void Utility::checkApiConfigSourceNames( void Utility::validateClusterName(const Upstream::ClusterManager::ClusterInfoMap& clusters, const std::string& cluster_name) { const auto& it = clusters.find(cluster_name); + if (it == clusters.end() || it->second.get().info()->addedViaApi() || it->second.get().info()->type() == envoy::api::v2::Cluster::EDS) { throw EnvoyException(fmt::format( @@ -248,8 +250,9 @@ Grpc::AsyncClientFactoryPtr Utility::factoryForGrpcApiConfigSource( const envoy::api::v2::core::ApiConfigSource& api_config_source, Stats::Scope& scope) { Utility::checkApiConfigSourceNames(api_config_source); - if (api_config_source.api_type() != envoy::api::v2::core::ApiConfigSource::GRPC) { - throw EnvoyException(fmt::format("envoy::api::v2::core::ConfigSource type must be GRPC: {}", + if (api_config_source.api_type() != envoy::api::v2::core::ApiConfigSource::GRPC && + api_config_source.api_type() != envoy::api::v2::core::ApiConfigSource::DELTA_GRPC) { + throw EnvoyException(fmt::format("envoy::api::v2::core::ConfigSource type must be gRPC: {}", api_config_source.DebugString())); } diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index 4498eb99bb104..67e086258e21b 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -107,7 +107,12 @@ class RdsRouteConfigSubscription } // Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } void onConfigUpdateFailed(const EnvoyException* e) override; std::string resourceName(const ProtobufWkt::Any& resource) override { return MessageUtil::anyConvert(resource).name(); diff --git a/source/common/secret/sds_api.h b/source/common/secret/sds_api.h index b19df9d54023b..6123159b372fe 100644 --- a/source/common/secret/sds_api.h +++ b/source/common/secret/sds_api.h @@ -40,7 +40,12 @@ class SdsApi : public Init::Target, void initialize(std::function callback) override; // Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } void onConfigUpdateFailed(const EnvoyException* e) override; std::string resourceName(const ProtobufWkt::Any& resource) override { return MessageUtil::anyConvert(resource).name(); diff --git a/source/common/stats/stat_data_allocator_impl.h b/source/common/stats/stat_data_allocator_impl.h index 203f9aef1ed35..4b17527d3b8ce 100644 --- a/source/common/stats/stat_data_allocator_impl.h +++ b/source/common/stats/stat_data_allocator_impl.h @@ -138,7 +138,7 @@ template class GaugeImpl : public Gauge, public MetricImpl { } virtual void sub(uint64_t amount) override { ASSERT(data_.value_ >= amount); - ASSERT(used()); + ASSERT(used() || amount == 0); data_.value_ -= amount; } virtual uint64_t value() const override { return data_.value_; } diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index c3eb5c76c07a5..805ffe41eed7b 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -34,31 +34,47 @@ CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, Clu Config::SubscriptionFactory::subscriptionFromConfigSource( cds_config, local_info, dispatcher, cm, random, *scope_, "envoy.api.v2.ClusterDiscoveryService.FetchClusters", - "envoy.api.v2.ClusterDiscoveryService.StreamClusters", api); + "envoy.api.v2.ClusterDiscoveryService.DeltaClusters", api); } void CdsApiImpl::onConfigUpdate(const ResourceVector& resources, const std::string& version_info) { + ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters(); + for (const auto& cluster : resources) { + clusters_to_remove.erase(cluster.name()); + } + Protobuf::RepeatedPtrField to_remove_repeated; + for (const auto& cluster : clusters_to_remove) { + *to_remove_repeated.Add() = cluster.first; + } + Protobuf::RepeatedPtrField to_add_repeated; + for (const auto& cluster : resources) { + envoy::api::v2::Resource* to_add = to_add_repeated.Add(); + to_add->set_name(cluster.name()); + to_add->set_version(version_info); + to_add->mutable_resource()->PackFrom(cluster); + } + onConfigUpdate(to_add_repeated, to_remove_repeated, version_info); +} + +void CdsApiImpl::onConfigUpdate( + const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) { cm_.adsMux().pause(Config::TypeUrl::get().ClusterLoadAssignment); Cleanup eds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().ClusterLoadAssignment); }); std::vector exception_msgs; std::unordered_set cluster_names; - for (const auto& cluster : resources) { - if (!cluster_names.insert(cluster.name()).second) { - throw EnvoyException(fmt::format("duplicate cluster {} found", cluster.name())); - } - } - for (const auto& cluster : resources) { - MessageUtil::validate(cluster); - } - // We need to keep track of which clusters we might need to remove. - ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters(); - for (auto& cluster : resources) { - const std::string cluster_name = cluster.name(); + for (const auto& resource : added_resources) { + envoy::api::v2::Cluster cluster; try { - clusters_to_remove.erase(cluster_name); + cluster = MessageUtil::anyConvert(resource.resource()); + MessageUtil::validate(cluster); + if (!cluster_names.insert(cluster.name()).second) { + throw EnvoyException(fmt::format("duplicate cluster {} found", cluster.name())); + } if (cm_.addOrUpdateCluster( - cluster, version_info, + cluster, resource.version(), [this](const std::string&, ClusterManager::ClusterWarmingState state) { // Following if/else block implements a control flow mechanism that can be used // by an ADS implementation to properly sequence CDS and RDS update. It is not @@ -85,26 +101,24 @@ void CdsApiImpl::onConfigUpdate(const ResourceVector& resources, const std::stri cm_.adsMux().resume(Config::TypeUrl::get().Cluster); } })) { - ENVOY_LOG(debug, "cds: add/update cluster '{}'", cluster_name); + ENVOY_LOG(debug, "cds: add/update cluster '{}'", cluster.name()); } } catch (const EnvoyException& e) { - exception_msgs.push_back(fmt::format("{}: {}", cluster_name, e.what())); + exception_msgs.push_back(fmt::format("{}: {}", cluster.name(), e.what())); } } - - for (auto cluster : clusters_to_remove) { - const std::string cluster_name = cluster.first; - if (cm_.removeCluster(cluster_name)) { - ENVOY_LOG(debug, "cds: remove cluster '{}'", cluster_name); + for (auto resource_name : removed_resources) { + if (cm_.removeCluster(resource_name)) { + ENVOY_LOG(debug, "cds: remove cluster '{}'", resource_name); } } - version_info_ = version_info; runInitializeCallbackIfAny(); if (!exception_msgs.empty()) { throw EnvoyException( fmt::format("Error adding/updating cluster(s) {}", StringUtil::join(exception_msgs, ", "))); } + system_version_info_ = system_version_info; } void CdsApiImpl::onConfigUpdateFailed(const EnvoyException*) { diff --git a/source/common/upstream/cds_api_impl.h b/source/common/upstream/cds_api_impl.h index 8fb1fb1cc3035..77e215259fd75 100644 --- a/source/common/upstream/cds_api_impl.h +++ b/source/common/upstream/cds_api_impl.h @@ -32,10 +32,13 @@ class CdsApiImpl : public CdsApi, void setInitializedCb(std::function callback) override { initialize_callback_ = callback; } - const std::string versionInfo() const override { return version_info_; } + const std::string versionInfo() const override { return system_version_info_; } // Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override; void onConfigUpdateFailed(const EnvoyException* e) override; std::string resourceName(const ProtobufWkt::Any& resource) override { return MessageUtil::anyConvert(resource).name(); @@ -49,7 +52,7 @@ class CdsApiImpl : public CdsApi, ClusterManager& cm_; std::unique_ptr> subscription_; - std::string version_info_; + std::string system_version_info_; std::function initialize_callback_; Stats::ScopePtr scope_; }; diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 07d63b062ef07..0e9e8eb52667d 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -178,6 +178,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable callback) override { init_helper_.setInitializedCb(callback); } + ClusterInfoMap clusters() override { // TODO(mattklein123): Add ability to see warming clusters in admin output. ClusterInfoMap clusters_map; diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index 05a2010105231..9bfbfef7beb98 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -28,7 +28,12 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, InitializePhase initializePhase() const override { return InitializePhase::Secondary; } // Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } void onConfigUpdateFailed(const EnvoyException* e) override; std::string resourceName(const ProtobufWkt::Any& resource) override { return MessageUtil::anyConvert(resource).cluster_name(); diff --git a/source/server/lds_api.h b/source/server/lds_api.h index fefea2e171564..713ead3f118a6 100644 --- a/source/server/lds_api.h +++ b/source/server/lds_api.h @@ -34,7 +34,12 @@ class LdsApiImpl : public LdsApi, void initialize(std::function callback) override; // Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } void onConfigUpdateFailed(const EnvoyException* e) override; std::string resourceName(const ProtobufWkt::Any& resource) override { return MessageUtil::anyConvert(resource).name(); diff --git a/source/server/server.cc b/source/server/server.cc index 1649f4f3a6a03..62e9ae99a902f 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -67,7 +67,6 @@ InstanceImpl::InstanceImpl(const Options& options, Event::TimeSystem& time_syste terminated_(false), mutex_tracer_(options.mutexTracingEnabled() ? &Envoy::MutexTracerImpl::getOrCreateTracer() : nullptr) { - try { if (!options.logPath().empty()) { try { diff --git a/test/common/config/config_provider_impl_test.cc b/test/common/config/config_provider_impl_test.cc index 086ab59d98d14..f25f77a0b1dd8 100644 --- a/test/common/config/config_provider_impl_test.cc +++ b/test/common/config/config_provider_impl_test.cc @@ -52,6 +52,7 @@ class DummyConfigSubscription void start() override {} // Envoy::Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override { const auto& config = resources[0]; if (checkAndApplyConfig(config, "dummy_config", version_info)) { @@ -60,6 +61,10 @@ class DummyConfigSubscription ConfigSubscriptionInstanceBase::onConfigUpdate(); } + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } // Envoy::Config::SubscriptionCallbacks void onConfigUpdateFailed(const EnvoyException*) override {} diff --git a/test/common/config/subscription_factory_test.cc b/test/common/config/subscription_factory_test.cc index 9c457c5f078dd..81babecf65c48 100644 --- a/test/common/config/subscription_factory_test.cc +++ b/test/common/config/subscription_factory_test.cc @@ -172,9 +172,9 @@ TEST_F(SubscriptionFactoryTest, GrpcClusterMultiton) { EXPECT_CALL(*cluster.info_, addedViaApi()).WillRepeatedly(Return(false)); EXPECT_CALL(*cluster.info_, type()).WillRepeatedly(Return(envoy::api::v2::Cluster::STATIC)); - EXPECT_THROW_WITH_REGEX( - subscriptionFromConfigSource(config), EnvoyException, - "envoy::api::v2::core::ConfigSource::GRPC must have a single gRPC service specified:"); + EXPECT_THROW_WITH_REGEX(subscriptionFromConfigSource(config), EnvoyException, + "envoy::api::v2::core::ConfigSource::.DELTA_.GRPC must have a " + "single gRPC service specified:"); } TEST_F(SubscriptionFactoryTest, FilesystemSubscription) { diff --git a/test/common/config/utility_test.cc b/test/common/config/utility_test.cc index c48eacd759077..9c4ab302f695b 100644 --- a/test/common/config/utility_test.cc +++ b/test/common/config/utility_test.cc @@ -268,7 +268,8 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { EXPECT_THROW_WITH_REGEX( Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope), EnvoyException, - "envoy::api::v2::core::ConfigSource::GRPC must have a single gRPC service specified:"); + "envoy::api::v2::core::ConfigSource::.DELTA_.GRPC must have a single gRPC service " + "specified:"); } { @@ -279,7 +280,8 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { EXPECT_THROW_WITH_REGEX( Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope), EnvoyException, - "envoy::api::v2::core::ConfigSource::GRPC must not have a cluster name specified:"); + "envoy::api::v2::core::ConfigSource::.DELTA_.GRPC must not have a cluster name " + "specified:"); } { @@ -290,7 +292,8 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { EXPECT_THROW_WITH_REGEX( Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope), EnvoyException, - "envoy::api::v2::core::ConfigSource::GRPC must not have a cluster name specified:"); + "envoy::api::v2::core::ConfigSource::.DELTA_.GRPC must not have a cluster name " + "specified:"); } { @@ -301,7 +304,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { EXPECT_THROW_WITH_REGEX( Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope), EnvoyException, - "envoy::api::v2::core::ConfigSource, if not of type gRPC, must not have a gRPC service " + "envoy::api::v2::core::ConfigSource, if not a gRPC type, must not have a gRPC service " "specified:"); } @@ -311,7 +314,7 @@ TEST(UtilityTest, FactoryForGrpcApiConfigSource) { api_config_source.add_cluster_names("foo"); EXPECT_THROW_WITH_REGEX( Utility::factoryForGrpcApiConfigSource(async_client_manager, api_config_source, scope), - EnvoyException, "envoy::api::v2::core::ConfigSource type must be GRPC:"); + EnvoyException, "envoy::api::v2::core::ConfigSource type must be gRPC:"); } { @@ -388,7 +391,8 @@ TEST(CheckApiConfigSourceSubscriptionBackingClusterTest, GrpcClusterTestAcrossTy EXPECT_THROW_WITH_REGEX( Utility::checkApiConfigSourceSubscriptionBackingCluster(cluster_map, *api_config_source), EnvoyException, - "envoy::api::v2::core::ConfigSource::GRPC must not have a cluster name specified:"); + "envoy::api::v2::core::ConfigSource::.DELTA_.GRPC must not have a cluster name " + "specified:"); } TEST(CheckApiConfigSourceSubscriptionBackingClusterTest, RestClusterTestAcrossTypes) { diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index e9fa17c85acc1..d18aee1f7042d 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -18,6 +18,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::AnyNumber; using testing::InSequence; using testing::Invoke; using testing::Return; @@ -48,14 +49,12 @@ class CdsApiImplTest : public testing::Test { Config::Utility::translateCdsConfig(*config, cds_config); cds_config.mutable_api_config_source()->set_api_type( envoy::api::v2::core::ApiConfigSource::REST); - Upstream::ClusterManager::ClusterInfoMap cluster_map; - Upstream::MockClusterMockPrioritySet cluster; - cluster_map.emplace("foo_cluster", cluster); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); - EXPECT_CALL(cluster, info()); - EXPECT_CALL(*cluster.info_, addedViaApi()); - EXPECT_CALL(cluster, info()); - EXPECT_CALL(*cluster.info_, type()); + cluster_map_.emplace("foo_cluster", mock_cluster_); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(mock_cluster_, info()).Times(AnyNumber()); + EXPECT_CALL(*mock_cluster_.info_, addedViaApi()); + EXPECT_CALL(mock_cluster_, info()).Times(AnyNumber()); + EXPECT_CALL(*mock_cluster_.info_, type()); cds_ = CdsApiImpl::create(cds_config, cm_, dispatcher_, random_, local_info_, store_, *api_); resetCdsInitializedCb(); @@ -167,6 +166,8 @@ class CdsApiImplTest : public testing::Test { }; NiceMock cm_; + Upstream::ClusterManager::ClusterInfoMap cluster_map_; + Upstream::MockClusterMockPrioritySet mock_cluster_; NiceMock dispatcher_; NiceMock random_; NiceMock local_info_; @@ -188,8 +189,9 @@ TEST_F(CdsApiImplTest, ValidateFail) { Protobuf::RepeatedPtrField clusters; clusters.Add(); - EXPECT_THROW(dynamic_cast(cds_.get())->onConfigUpdate(clusters, ""), - ProtoValidationException); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(initialized_, ready()); + EXPECT_THROW(dynamic_cast(cds_.get())->onConfigUpdate(clusters, ""), EnvoyException); EXPECT_CALL(request_, cancel()); } @@ -206,8 +208,12 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) { auto* cluster_2 = clusters.Add(); cluster_2->set_name("duplicate_cluster"); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(initialized_, ready()); EXPECT_THROW_WITH_MESSAGE(dynamic_cast(cds_.get())->onConfigUpdate(clusters, ""), - EnvoyException, "duplicate cluster duplicate_cluster found"); + EnvoyException, + "Error adding/updating cluster(s) duplicate_cluster: duplicate cluster " + "duplicate_cluster found"); EXPECT_CALL(request_, cancel()); } @@ -362,6 +368,7 @@ version_info: '1' TEST_F(CdsApiImplTest, CdsPauseOnWarming) { interval_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(ClusterManager::ClusterInfoMap{})); InSequence s; setup(); @@ -385,7 +392,6 @@ version_info: '0' // Two clusters updated, both warmed up. EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().ClusterLoadAssignment)).Times(1); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); cm_.expectAddWithWarming("cluster1", "0"); cm_.expectWarmingClusterCount(); EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().Cluster)).Times(1); @@ -421,7 +427,6 @@ version_info: '1' )EOF"; EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().ClusterLoadAssignment)).Times(1); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); cm_.expectAddWithWarming("cluster1", "1"); cm_.expectWarmingClusterCount(); EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().Cluster)).Times(1); @@ -451,7 +456,6 @@ version_info: '2' )EOF"; EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().ClusterLoadAssignment)).Times(1); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); cm_.expectAddWithWarming("cluster4", "2"); cm_.expectWarmingClusterCount(); EXPECT_CALL(initialized_, ready()); @@ -485,7 +489,6 @@ version_info: '3' // Two clusters updated, first one warmed up before processing of the second one starts. EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().ClusterLoadAssignment)).Times(1); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); cm_.expectAddWithWarming("cluster5", "3", true); cm_.expectWarmingClusterCount(); EXPECT_CALL(cm_.ads_mux_, pause(Config::TypeUrl::get().Cluster)).Times(1); @@ -527,6 +530,7 @@ version_info: '0' path: eds path )EOF"; + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); EXPECT_CALL(initialized_, ready()); EXPECT_CALL(*interval_timer_, enableTimer(_)); callbacks_->onSuccess(parseResponseMessageFromYaml(response_yaml)); @@ -535,6 +539,7 @@ version_info: '0' interval_timer_->callback_(); EXPECT_CALL(*interval_timer_, enableTimer(_)); + callbacks_->onFailure(Http::AsyncClient::FailureReason::Reset); EXPECT_EQ("", cds_->versionInfo()); diff --git a/test/integration/BUILD b/test/integration/BUILD index 8635220d0bc8d..9ed0685ea7f17 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -96,6 +96,27 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "delta_cds_integration_test", + srcs = ["delta_cds_integration_test.cc"], + data = [ + "//test/config/integration/certs", + ], + deps = [ + ":http_integration_lib", + "//source/common/config:protobuf_link_hacks", + "//source/common/config:resources_lib", + "//source/common/protobuf:utility_lib", + "//test/common/grpc:grpc_client_integration_lib", + "//test/mocks/runtime:runtime_mocks", + "//test/mocks/server:server_mocks", + "//test/test_common:network_utility_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/api/v2:cds_cc", + "@envoy_api//envoy/api/v2:discovery_cc", + ], +) + exports_files(["test_utility.sh"]) envoy_sh_test( diff --git a/test/integration/delta_cds_integration_test.cc b/test/integration/delta_cds_integration_test.cc new file mode 100644 index 0000000000000..42381b7f9c5af --- /dev/null +++ b/test/integration/delta_cds_integration_test.cc @@ -0,0 +1,324 @@ +#include "envoy/api/v2/cds.pb.h" +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/grpc/status.h" +#include "envoy/stats/scope.h" + +#include "common/config/protobuf_link_hacks.h" +#include "common/config/resources.h" +#include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/integration/http_integration.h" +#include "test/integration/utility.h" +#include "test/mocks/server/mocks.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "absl/synchronization/notification.h" +#include "gtest/gtest.h" + +using testing::AssertionFailure; +using testing::AssertionResult; +using testing::AssertionSuccess; +using testing::IsSubstring; + +namespace Envoy { +namespace { + +// TODO(fredlas) Move to test/config/utility.cc once there are other xDS tests that use gRPC. +const char Config[] = R"EOF( +admin: + access_log_path: /dev/null + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +dynamic_resources: + cds_config: + api_config_source: + api_type: DELTA_GRPC + grpc_services: + envoy_grpc: + cluster_name: my_cds_cluster +static_resources: + clusters: + - name: my_cds_cluster + http2_protocol_options: {} + hosts: + socket_address: + address: 127.0.0.1 + port_value: 0 + listeners: + name: http + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + filters: + name: envoy.http_connection_manager + config: + stat_prefix: config_test + http_filters: + name: envoy.router + codec_type: HTTP2 + route_config: + name: route_config_0 + validate_clusters: false + virtual_hosts: + name: integration + routes: + - route: + cluster: cluster_1 + match: + prefix: "/cluster1" + - route: + cluster: cluster_2 + match: + prefix: "/cluster2" + domains: "*" +)EOF"; +const char ClusterName1[] = "cluster_1"; +const char ClusterName2[] = "cluster_2"; +const int UpstreamIndex1 = 1; +const int UpstreamIndex2 = 2; + +class DeltaCdsIntegrationTest : public HttpIntegrationTest, + public Grpc::GrpcClientIntegrationParamTest { +public: + DeltaCdsIntegrationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, ipVersion(), realTime(), Config) {} + + void TearDown() override { + cleanUpXdsConnection(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + // TODO(fredlas) Move to test/config/utility.cc once there are other xDS tests that use gRPC. + envoy::api::v2::Cluster buildCluster(const std::string& name, int upstream_index) { + return TestUtility::parseYaml( + fmt::format(R"EOF( + name: {} + connect_timeout: 5s + type: STATIC + load_assignment: + cluster_name: {} + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: {} + port_value: {} + lb_policy: ROUND_ROBIN + http2_protocol_options: {{}} + )EOF", + name, name, Network::Test::getLoopbackAddressString(ipVersion()), + fake_upstreams_[upstream_index]->localAddress()->ip()->port())); + } + + // Overridden to insert this stuff into the initialize() at the very beginning of + // HttpIntegrationTest::testRouterRequestAndResponseWithBody(). + void initialize() override { + // Controls how many fake_upstreams_.emplace_back(new FakeUpstream) will happen in + // BaseIntegrationTest::createUpstreams() (which is part of initialize()). + // Make sure this number matches the size of the 'clusters' repeated field in the bootstrap + // config that you use! + setUpstreamCount(1); // the CDS cluster + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); // CDS uses gRPC uses HTTP2. + + // BaseIntegrationTest::initialize() does many things: + // 1) It appends to fake_upstreams_ as many as you asked for via setUpstreamCount(). + // 2) It updates your bootstrap config with the ports your fake upstreams are actually listening + // on (since you're supposed to leave them as 0). + // 3) It creates and starts an IntegrationTestServer - the thing that wraps the almost-actual + // Envoy used in the tests. + // 4) Bringing up the server usually entails waiting to ensure that any listeners specified in + // the bootstrap config have come up, and registering them in a port map (see lookupPort()). + // However, this test needs to defer all of that to later. + defer_listener_finalization_ = true; + HttpIntegrationTest::initialize(); + + // Create the regular (i.e. not an xDS server) upstream. We create it manually here after + // initialize() because finalize() expects all fake_upstreams_ to correspond to a static + // cluster in the bootstrap config - which we don't want since we're testing dynamic CDS! + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, + timeSystem(), enable_half_close_)); + fake_upstreams_[UpstreamIndex1]->set_allow_unexpected_disconnects(false); + + // Now that the upstream has been created, process Envoy's request to discover it. + // (First, we have to let Envoy establish its connection to the CDS server.) + acceptXdsConnection(); + + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {})); + sendDeltaDiscoveryResponse( + {buildCluster(ClusterName1, UpstreamIndex1)}, {}, "55"); + // We can continue the test once we're sure that Envoy's ClusterManager has made use of + // the DiscoveryResponse describing cluster_1 that we sent. + // 2 because the statically specified CDS server itself counts as a cluster. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + + // Wait for our statically specified listener to become ready, and register its port in the + // test framework's downstream listener port map. + test_server_->waitUntilListenersReady(); + registerTestServerPorts({"http"}); + } + + void acceptXdsConnection() { + AssertionResult result = // xds_connection_ is filled with the new FakeHttpConnection. + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + } +}; + +INSTANTIATE_TEST_CASE_P(IpVersionsClientType, DeltaCdsIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS); + +// 1) Envoy starts up with no static clusters (other than the CDS-over-gRPC server). +// 2) Envoy is told of a cluster via CDS. +// 3) We send Envoy a request, which we verify is properly proxied to and served by that cluster. +// 4) Envoy is told that cluster is gone. +// 5) We send Envoy a request, which should 503. +// 6) Envoy is told that the cluster is back. +// 7) We send Envoy a request, which we verify is properly proxied to and served by that cluster. +TEST_P(DeltaCdsIntegrationTest, CdsClusterUpDownUp) { + // Calls our initialize(), which includes establishing a listener, route, and cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + + // Tell Envoy that cluster_1 is gone. + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {})); + sendDeltaDiscoveryResponse({}, {ClusterName1}, "42"); + // We can continue the test once we're sure that Envoy's ClusterManager has made use of + // the DiscoveryResponse that says cluster_1 is gone. + test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + + // Now that cluster_1 is gone, the listener (with its routing to cluster_1) should 503. + BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( + lookupPort("http"), "GET", "/cluster1", "", downstream_protocol_, version_, "foo.com"); + ASSERT_TRUE(response->complete()); + EXPECT_STREQ("503", response->headers().Status()->value().c_str()); + + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Tell Envoy that cluster_1 is back. + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {})); + sendDeltaDiscoveryResponse({buildCluster(ClusterName1, UpstreamIndex1)}, + {}, "413"); + + // We can continue the test once we're sure that Envoy's ClusterManager has made use of + // the DiscoveryResponse describing cluster_1 that we sent. Again, 2 includes CDS server. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + + // Does *not* call our initialize(). + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + + cleanupUpstreamAndDownstream(); +} + +// Tests adding a cluster, adding another, then removing the first. +TEST_P(DeltaCdsIntegrationTest, TwoClusters) { + // Calls our initialize(), which includes establishing a listener, route, and cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Add another fake upstream, to be cluster_2. + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, + timeSystem(), enable_half_close_)); + fake_upstreams_[UpstreamIndex2]->set_allow_unexpected_disconnects(false); + + // Tell Envoy that cluster_2 is here. + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {})); + sendDeltaDiscoveryResponse({buildCluster(ClusterName2, UpstreamIndex2)}, + {}, "42"); + // The '3' includes the fake CDS server. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + + // A request for cluster_2 should be fine. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex2, "/cluster2"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Tell Envoy that cluster_1 is gone. + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {})); + sendDeltaDiscoveryResponse({}, {ClusterName1}, "42"); + // We can continue the test once we're sure that Envoy's ClusterManager has made use of + // the DiscoveryResponse that says cluster_1 is gone. + test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + + // Even with cluster_1 is gone, a request for cluster_2 should be fine. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex2, "/cluster2"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Tell Envoy that cluster_1 is back. + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {})); + sendDeltaDiscoveryResponse({buildCluster(ClusterName1, UpstreamIndex1)}, + {}, "413"); + + // We can continue the test once we're sure that Envoy's ClusterManager has made use of + // the DiscoveryResponse describing cluster_1 that we sent. Again, 3 includes CDS server. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + + // Does *not* call our initialize(). + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + + cleanupUpstreamAndDownstream(); +} + +// Tests that when Envoy's xDS gRPC stream dis/reconnects, Envoy can inform the server of the +// resources it already has: the reconnected stream need not start with a state-of-the-world update. +TEST_P(DeltaCdsIntegrationTest, VersionsRememberedAfterReconnect) { + // Calls our initialize(), which includes establishing a listener, route, and cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Close the connection carrying Envoy's xDS gRPC stream... + AssertionResult result = xds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + xds_connection_.reset(); + // ...and reconnect it. + acceptXdsConnection(); + + // Upon reconnecting, the Envoy should tell us its current resource versions. + envoy::api::v2::DeltaDiscoveryRequest request; + result = xds_stream_->waitForGrpcMessage(*dispatcher_, request); + RELEASE_ASSERT(result, result.message()); + const auto& initial_resource_versions = request.initial_resource_versions(); + EXPECT_EQ("55", initial_resource_versions.at(std::string(ClusterName1))); + EXPECT_EQ(1, initial_resource_versions.size()); + + // Add another fake upstream, to be cluster_2. + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, + timeSystem(), enable_half_close_)); + fake_upstreams_[UpstreamIndex2]->set_allow_unexpected_disconnects(false); + // Tell Envoy that cluster_2 is here. This update does *not* need to include cluster_1, + // which Envoy should already know about despite the disconnect. + sendDeltaDiscoveryResponse({buildCluster(ClusterName2, UpstreamIndex2)}, + {}, "42"); + // The '3' includes the fake CDS server. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + + // A request for cluster_1 should be fine. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + // A request for cluster_2 should be fine. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex2, "/cluster2"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); +} + +} // namespace +} // namespace Envoy diff --git a/test/integration/delta_xds_integration_test_base.cc b/test/integration/delta_xds_integration_test_base.cc new file mode 100644 index 0000000000000..806668c847e73 --- /dev/null +++ b/test/integration/delta_xds_integration_test_base.cc @@ -0,0 +1,91 @@ +#include "test/integration/delta_xds_integration_test_base.h" + +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/grpc/status.h" +#include "envoy/stats/scope.h" + +#include "common/config/resources.h" +#include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" + +#include "test/integration/http_integration.h" +#include "test/integration/utility.h" +#include "test/mocks/server/mocks.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +using testing::AssertionFailure; +using testing::AssertionResult; +using testing::AssertionSuccess; +using testing::IsSubstring; + +namespace Envoy { + +void DeltaXdsIntegrationTestBase::createXdsConnection(FakeUpstream& upstream) { + xds_upstream_ = &upstream; + AssertionResult result = xds_upstream_->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); +} + +void DeltaXdsIntegrationTestBase::cleanUpXdsConnection() { + // Don't ASSERT fail if an xDS reconnect ends up unparented. + if (xds_upstream_) { + xds_upstream_->set_allow_unexpected_disconnects(true); + } + AssertionResult result = xds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + xds_connection_.reset(); +} + +AssertionResult DeltaXdsIntegrationTestBase::compareDiscoveryRequest( + const std::string& expected_type_url, + const std::vector& expected_resource_subscriptions, + const std::vector& expected_resource_unsubscriptions, + const Protobuf::int32 expected_error_code, const std::string& expected_error_message) { + envoy::api::v2::DeltaDiscoveryRequest request; + VERIFY_ASSERTION(xds_stream_->waitForGrpcMessage(*dispatcher_, request)); + + EXPECT_TRUE(request.has_node()); + EXPECT_FALSE(request.node().id().empty()); + EXPECT_FALSE(request.node().cluster().empty()); + + // TODO(PiotrSikora): Remove this hack once fixed internally. + if (!(expected_type_url == request.type_url())) { + return AssertionFailure() << fmt::format("type_url {} does not match expected {}", + request.type_url(), expected_type_url); + } + if (!(expected_error_code == request.error_detail().code())) { + return AssertionFailure() << fmt::format("error_code {} does not match expected {}", + request.error_detail().code(), expected_error_code); + } + EXPECT_TRUE(IsSubstring("", "", expected_error_message, request.error_detail().message())); + + const std::vector resource_subscriptions(request.resource_names_subscribe().cbegin(), + request.resource_names_subscribe().cend()); + if (expected_resource_subscriptions != resource_subscriptions) { + return AssertionFailure() << fmt::format( + "newly subscribed resources {} do not match expected {} in {}", + fmt::join(resource_subscriptions.begin(), resource_subscriptions.end(), ","), + fmt::join(expected_resource_subscriptions.begin(), + expected_resource_subscriptions.end(), ","), + request.DebugString()); + } + const std::vector resource_unsubscriptions( + request.resource_names_unsubscribe().cbegin(), request.resource_names_unsubscribe().cend()); + if (expected_resource_unsubscriptions != resource_unsubscriptions) { + return AssertionFailure() << fmt::format( + "newly UNsubscribed resources {} do not match expected {} in {}", + fmt::join(resource_unsubscriptions.begin(), resource_unsubscriptions.end(), ","), + fmt::join(expected_resource_unsubscriptions.begin(), + expected_resource_unsubscriptions.end(), ","), + request.DebugString()); + } + return AssertionSuccess(); +} + +} // namespace Envoy diff --git a/test/integration/delta_xds_integration_test_base.h b/test/integration/delta_xds_integration_test_base.h new file mode 100644 index 0000000000000..a7b8c80270b1f --- /dev/null +++ b/test/integration/delta_xds_integration_test_base.h @@ -0,0 +1,47 @@ +#pragma once + +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/grpc/status.h" +#include "envoy/stats/scope.h" + +#include "common/config/resources.h" +#include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" + +#include "test/integration/http_integration.h" +#include "test/integration/utility.h" +#include "test/mocks/server/mocks.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +using testing::AssertionFailure; +using testing::AssertionResult; +using testing::AssertionSuccess; +using testing::IsSubstring; + +namespace Envoy { + +class DeltaXdsIntegrationTestBase : public HttpIntegrationTest { +public: + DeltaXdsIntegrationTestBase(Http::CodecClient::Type downstream_protocol, + Network::Address::IpVersion version) + : HttpIntegrationTest(downstream_protocol, version, realTime()) {} + DeltaXdsIntegrationTestBase(Http::CodecClient::Type downstream_protocol, + Network::Address::IpVersion version, const std::string& config) + : HttpIntegrationTest(downstream_protocol, version, realTime(), config) {} + + void createXdsConnection(FakeUpstream& upstream); + + void cleanUpXdsConnection(); + +protected: + FakeUpstream* xds_upstream_{}; + FakeHttpConnectionPtr xds_connection_; + FakeStreamPtr xds_stream_; + testing::NiceMock factory_context_; +}; + +} // namespace Envoy diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index b96cc7007e2ee..dd8fe59451747 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -25,7 +25,6 @@ namespace Envoy { namespace { // TODO(jmarantz): switch this to simulated-time after debugging flakes. - class HdsIntegrationTest : public testing::TestWithParam, public HttpIntegrationTest { public: diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index e0b4946d233ff..d8d7eb2951ada 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -345,7 +345,7 @@ void HttpIntegrationTest::testRouterRequestAndResponseWithBody( IntegrationStreamDecoderPtr HttpIntegrationTest::makeHeaderOnlyRequest(ConnectionCreationFunction* create_connection, - int upstream_index) { + int upstream_index, const std::string& path) { // This is called multiple times per test in ads_integration_test. Only call // initialize() the first time. if (!initialized()) { @@ -354,7 +354,7 @@ HttpIntegrationTest::makeHeaderOnlyRequest(ConnectionCreationFunction* create_co codec_client_ = makeHttpConnection( create_connection ? ((*create_connection)()) : makeClientConnection((lookupPort("http")))); Http::TestHeaderMapImpl request_headers{{":method", "GET"}, - {":path", "/test/long/url"}, + {":path", path}, {":scheme", "http"}, {":authority", "host"}, {"x-lyft-user-id", "123"}}; @@ -363,8 +363,8 @@ HttpIntegrationTest::makeHeaderOnlyRequest(ConnectionCreationFunction* create_co } void HttpIntegrationTest::testRouterHeaderOnlyRequestAndResponse( - ConnectionCreationFunction* create_connection, int upstream_index) { - auto response = makeHeaderOnlyRequest(create_connection, upstream_index); + ConnectionCreationFunction* create_connection, int upstream_index, const std::string& path) { + auto response = makeHeaderOnlyRequest(create_connection, upstream_index, path); checkSimpleRequestSuccess(0U, 0U, response.get()); } diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 7111838439f84..3fb328ac2be56 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -139,7 +139,8 @@ class HttpIntegrationTest : public BaseIntegrationTest { typedef std::function ConnectionCreationFunction; // Sends a simple header-only HTTP request, and waits for a response. IntegrationStreamDecoderPtr makeHeaderOnlyRequest(ConnectionCreationFunction* create_connection, - int upstream_index); + int upstream_index, + const std::string& path = "/test/long/url"); void testRouterNotFound(); void testRouterNotFoundWithBody(); @@ -147,7 +148,8 @@ class HttpIntegrationTest : public BaseIntegrationTest { bool big_header, ConnectionCreationFunction* creator = nullptr); void testRouterHeaderOnlyRequestAndResponse(ConnectionCreationFunction* creator = nullptr, - int upstream_index = 0); + int upstream_index = 0, + const std::string& path = "/test/long/url"); void testRequestAndResponseShutdownWithActiveConnection(); // Disconnect tests diff --git a/test/integration/integration.cc b/test/integration/integration.cc index e9499226ff344..2c0948acf7b63 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -525,4 +525,51 @@ AssertionResult BaseIntegrationTest::compareDiscoveryRequest( } return AssertionSuccess(); } + +AssertionResult BaseIntegrationTest::compareDeltaDiscoveryRequest( + const std::string& expected_type_url, + const std::vector& expected_resource_subscriptions, + const std::vector& expected_resource_unsubscriptions, + const Protobuf::int32 expected_error_code, const std::string& expected_error_message) { + envoy::api::v2::DeltaDiscoveryRequest request; + VERIFY_ASSERTION(xds_stream_->waitForGrpcMessage(*dispatcher_, request)); + + EXPECT_TRUE(request.has_node()); + EXPECT_FALSE(request.node().id().empty()); + EXPECT_FALSE(request.node().cluster().empty()); + + // TODO(PiotrSikora): Remove this hack once fixed internally. + if (!(expected_type_url == request.type_url())) { + return AssertionFailure() << fmt::format("type_url {} does not match expected {}", + request.type_url(), expected_type_url); + } + if (!(expected_error_code == request.error_detail().code())) { + return AssertionFailure() << fmt::format("error_code {} does not match expected {}", + request.error_detail().code(), expected_error_code); + } + EXPECT_TRUE(IsSubstring("", "", expected_error_message, request.error_detail().message())); + + const std::vector resource_subscriptions(request.resource_names_subscribe().cbegin(), + request.resource_names_subscribe().cend()); + if (expected_resource_subscriptions != resource_subscriptions) { + return AssertionFailure() << fmt::format( + "newly subscribed resources {} do not match expected {} in {}", + fmt::join(resource_subscriptions.begin(), resource_subscriptions.end(), ","), + fmt::join(expected_resource_subscriptions.begin(), + expected_resource_subscriptions.end(), ","), + request.DebugString()); + } + const std::vector resource_unsubscriptions( + request.resource_names_unsubscribe().cbegin(), request.resource_names_unsubscribe().cend()); + if (expected_resource_unsubscriptions != resource_unsubscriptions) { + return AssertionFailure() << fmt::format( + "newly UNsubscribed resources {} do not match expected {} in {}", + fmt::join(resource_unsubscriptions.begin(), resource_unsubscriptions.end(), ","), + fmt::join(expected_resource_unsubscriptions.begin(), + expected_resource_unsubscriptions.end(), ","), + request.DebugString()); + } + return AssertionSuccess(); +} + } // namespace Envoy diff --git a/test/integration/integration.h b/test/integration/integration.h index d6d03e98f9a22..db2879b624be5 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -215,6 +215,29 @@ class BaseIntegrationTest : Logger::Loggable { xds_stream_->sendGrpcMessage(discovery_response); } + AssertionResult compareDeltaDiscoveryRequest( + const std::string& expected_type_url, + const std::vector& expected_resource_subscriptions, + const std::vector& expected_resource_unsubscriptions, + const Protobuf::int32 expected_error_code = Grpc::Status::GrpcStatus::Ok, + const std::string& expected_error_message = ""); + template + void sendDeltaDiscoveryResponse(const std::vector& added_or_updated, + const std::vector& removed, + const std::string& version) { + envoy::api::v2::DeltaDiscoveryResponse response; + response.set_system_version_info("system_version_info_this_is_a_test"); + for (const auto& message : added_or_updated) { + auto* resource = response.add_resources(); + resource->set_name(message.name()); + resource->set_version(version); + resource->mutable_resource()->PackFrom(message); + } + *response.mutable_removed_resources() = {removed.begin(), removed.end()}; + response.set_nonce("noncense"); + xds_stream_->sendGrpcMessage(response); + } + private: Event::GlobalTimeSystem time_system_; diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 99014dad6defb..f6d8677642766 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -26,9 +26,14 @@ class MockSubscriptionCallbacks : public SubscriptionCallbacks { } template static std::string resourceName_(const T& resource) { return resource.name(); } + // TODO(fredlas) deduplicate MOCK_METHOD2_T(onConfigUpdate, void(const typename SubscriptionCallbacks::ResourceVector& resources, const std::string& version_info)); + MOCK_METHOD3_T(onConfigUpdate, + void(const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info)); MOCK_METHOD1_T(onConfigUpdateFailed, void(const EnvoyException* e)); MOCK_METHOD1_T(resourceName, std::string(const ProtobufWkt::Any& resource)); }; diff --git a/tools/spelling_dictionary.txt b/tools/spelling_dictionary.txt index 25d8ff3bd12d7..34a034702ad1a 100644 --- a/tools/spelling_dictionary.txt +++ b/tools/spelling_dictionary.txt @@ -183,6 +183,7 @@ RCU RDN RDS RDWR +REIMPLEMENT REQ RFC RHS @@ -258,6 +259,7 @@ WS Welford's Werror XDS +xDSes XFCC XFF XM