From 8adcf535a4292b41256653fa82af28000dcdf698 Mon Sep 17 00:00:00 2001 From: Dhi Aurrahman Date: Mon, 16 Jul 2018 08:25:31 +0700 Subject: [PATCH 1/4] Add load_assignment field implementation Signed-off-by: Dhi Aurrahman --- DEPRECATED.md | 3 +- api/envoy/api/v2/cds.proto | 9 +- api/envoy/api/v2/endpoint/endpoint.proto | 6 +- .../configuration/overview/v2_overview.rst | 33 +- .../intro/arch_overview/health_checking.rst | 31 + source/common/config/utility.cc | 15 + source/common/config/utility.h | 8 + source/common/upstream/eds.cc | 2 +- source/common/upstream/logical_dns_cluster.cc | 43 +- source/common/upstream/logical_dns_cluster.h | 42 +- source/common/upstream/upstream_impl.cc | 173 +++--- source/common/upstream/upstream_impl.h | 38 +- test/common/upstream/BUILD | 2 + .../upstream/cluster_manager_impl_test.cc | 2 +- .../upstream/logical_dns_cluster_test.cc | 318 ++++++++--- test/common/upstream/upstream_impl_test.cc | 536 +++++++++++++++++- 16 files changed, 1017 insertions(+), 244 deletions(-) diff --git a/DEPRECATED.md b/DEPRECATED.md index 389c0ecd95104..4181c62f0b59e 100644 --- a/DEPRECATED.md +++ b/DEPRECATED.md @@ -10,7 +10,7 @@ A logged warning is expected for each deprecated item that is in deprecation win * Use of the v1 API is deprecated. See envoy-announce [email](https://groups.google.com/forum/#!topic/envoy-announce/oPnYMZw8H4U). -* Use of the legacy +* Use of the legacy [ratelimit.proto](https://github.com/envoyproxy/envoy/blob/b0a518d064c8255e0e20557a8f909b6ff457558f/source/common/ratelimit/ratelimit.proto) is deprecated, in favor of the proto defined in [date-plane-api](https://github.com/envoyproxy/envoy/blob/master/api/envoy/service/ratelimit/v2/rls.proto) @@ -18,6 +18,7 @@ A logged warning is expected for each deprecated item that is in deprecation win `use_data_plane_proto` boolean flag in the [ratelimit configuration](https://github.com/envoyproxy/envoy/blob/master/api/envoy/config/ratelimit/v2/rls.proto). However, when using the deprecated client a warning is logged. * Use of the --v2-config-only flag. +* Setting hosts via `hosts` field in `Cluster` is deprecated. Use `load_assignment` instead. ## Version 1.7.0 diff --git a/api/envoy/api/v2/cds.proto b/api/envoy/api/v2/cds.proto index f649dafe0fb63..ea19212772e94 100644 --- a/api/envoy/api/v2/cds.proto +++ b/api/envoy/api/v2/cds.proto @@ -160,7 +160,13 @@ message Cluster { // :ref:`STRICT_DNS` // or :ref:`LOGICAL_DNS`, // then hosts is required. - repeated core.Address hosts = 7; + // + // .. attention:: + // + // **This field is deprecated**. Set the + // :ref:`load_assignment` field instead. + // + repeated core.Address hosts = 7 [deprecated = true]; // Setting this is required for specifying members of // :ref:`STATIC`, @@ -176,7 +182,6 @@ message Cluster { // :ref:`endpoint assignments`. // Setting this overrides :ref:`hosts` values. // - // [#not-implemented-hide:] ClusterLoadAssignment load_assignment = 33; // Optional :ref:`active health checking ` diff --git a/api/envoy/api/v2/endpoint/endpoint.proto b/api/envoy/api/v2/endpoint/endpoint.proto index 6f4cad1ce66e9..bfdc6bcffe1aa 100644 --- a/api/envoy/api/v2/endpoint/endpoint.proto +++ b/api/envoy/api/v2/endpoint/endpoint.proto @@ -29,7 +29,7 @@ message Endpoint { // and will be resolved via DNS. core.Address address = 1; - // [#not-implemented-hide:] The optional health check configuration. + // The optional health check configuration. message HealthCheckConfig { // Optional alternative health check port value. // @@ -40,8 +40,8 @@ message Endpoint { uint32 port_value = 1; } - // [#not-implemented-hide:] The optional health check configuration is used as - // configuration for the health checker to contact the health checked host. + // The optional health check configuration is used as configuration for the + // health checker to contact the health checked host. // // .. attention:: // diff --git a/docs/root/configuration/overview/v2_overview.rst b/docs/root/configuration/overview/v2_overview.rst index feb1032a4d4a1..3b9b92226245a 100644 --- a/docs/root/configuration/overview/v2_overview.rst +++ b/docs/root/configuration/overview/v2_overview.rst @@ -95,7 +95,14 @@ A minimal fully static bootstrap config is provided below: connect_timeout: 0.25s type: STATIC lb_policy: ROUND_ROBIN - hosts: [{ socket_address: { address: 127.0.0.2, port_value: 1234 }}] + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 Mostly static with dynamic EDS ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -150,7 +157,14 @@ on 127.0.0.3:5678 is provided below: type: STATIC lb_policy: ROUND_ROBIN http2_protocol_options: {} - hosts: [{ socket_address: { address: 127.0.0.3, port_value: 5678 }}] + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 5678 Notice above that *xds_cluster* is defined to point Envoy at the management server. Even in an otherwise completely dynamic configurations, some static resources need to @@ -214,7 +228,14 @@ below: type: STATIC lb_policy: ROUND_ROBIN http2_protocol_options: {} - hosts: [{ socket_address: { address: 127.0.0.3, port_value: 5678 }}] + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 5678 The management server could respond to LDS requests with: @@ -544,10 +565,10 @@ Management Server Unreachability -------------------------------- When Envoy instance looses connectivity with the management server, Envoy will latch on to -the previous configuration while actively retrying in the background to reestablish the -connection with the management server. +the previous configuration while actively retrying in the background to reestablish the +connection with the management server. -Envoy debug logs the fact that it is not able to establish a connection with the management server +Envoy debug logs the fact that it is not able to establish a connection with the management server every time it attempts a connection. :ref:`upstream_cx_connect_fail ` a cluster level statistic diff --git a/docs/root/intro/arch_overview/health_checking.rst b/docs/root/intro/arch_overview/health_checking.rst index 6928ac94d669b..017a5564e8f50 100644 --- a/docs/root/intro/arch_overview/health_checking.rst +++ b/docs/root/intro/arch_overview/health_checking.rst @@ -24,6 +24,37 @@ unhealthy, successes required before marking a host healthy, etc.): maintenance by setting the specified key to any value and waiting for traffic to drain. See :ref:`redis_key `. +.. _arch_overview_per_cluster_health_check_config: + +Per cluster member health check config +-------------------------------------- + +If active health checking is configured for an upstream cluster, a specific additional configuration +for each registered member can be specified by setting the +:ref:`health check config` +in :ref:`endpoint message` +of an :ref:`LbEndpoint` of each defined +:ref:`LocalityLbEndpoints` +in a :ref:`ClusterLoadAssignment`. + +An example of setting up :ref:`health check config` +to set a :ref:`cluster member`'s alternative health check +:ref:`port` is: + +.. code-block:: yaml + + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + health_check_config: + port_value: 8080 + + address: + socket_address: + address: localhost + port_value: 80 + .. _arch_overview_health_check_logging: Health check event logging diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index 007204761b8a2..37fac934f2ad5 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -225,5 +225,20 @@ Grpc::AsyncClientFactoryPtr Utility::factoryForGrpcApiConfigSource( return async_client_manager.factoryForGrpcService(grpc_service, scope, false); } +envoy::api::v2::ClusterLoadAssignment Utility::translateClusterHosts( + const Protobuf::RepeatedPtrField& hosts) { + envoy::api::v2::ClusterLoadAssignment load_assignment; + envoy::api::v2::endpoint::LocalityLbEndpoints* locality_lb_endpoints = + load_assignment.add_endpoints(); + // Since this LocalityLbEndpoints is built from hosts list, set the default weight to 1. + locality_lb_endpoints->mutable_load_balancing_weight()->set_value(1); + for (const envoy::api::v2::core::Address& host : hosts) { + envoy::api::v2::endpoint::LbEndpoint* lb_endpoint = locality_lb_endpoints->add_lb_endpoints(); + lb_endpoint->mutable_endpoint()->mutable_address()->MergeFrom(host); + lb_endpoint->mutable_load_balancing_weight()->set_value(1); + } + return load_assignment; +} + } // namespace Config } // namespace Envoy diff --git a/source/common/config/utility.h b/source/common/config/utility.h index 32797c4f627ba..79086504ad94b 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -288,6 +288,14 @@ class Utility { factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager, const envoy::api::v2::core::ApiConfigSource& api_config_source, Stats::Scope& scope); + + /** + * Translate a set of cluster's hosts into a load assignment configuration. + * @param hosts cluster's list of hosts. + * @return envoy::api::v2::ClusterLoadAssignment a load assignment configuration. + */ + static envoy::api::v2::ClusterLoadAssignment + translateClusterHosts(const Protobuf::RepeatedPtrField& hosts); }; } // namespace Config diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index 4fb00c0d2e40b..46435a6de4d3c 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -140,7 +140,7 @@ bool EdsClusterImpl::updateHostsPerLocality(const uint32_t priority, const HostV info_->name(), host_set.hosts().size(), host_set.priority()); priority_state_manager.updateClusterPrioritySet(priority, std::move(current_hosts_copy), - hosts_added, hosts_removed); + hosts_added, hosts_removed, absl::nullopt); return true; } return false; diff --git a/source/common/upstream/logical_dns_cluster.cc b/source/common/upstream/logical_dns_cluster.cc index 7245af7e31565..21b56669320bb 100644 --- a/source/common/upstream/logical_dns_cluster.cc +++ b/source/common/upstream/logical_dns_cluster.cc @@ -18,6 +18,7 @@ namespace Upstream { LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api) @@ -27,10 +28,17 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))), tls_(tls.allocateSlot()), - resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) { - const auto& hosts = cluster.hosts(); - if (hosts.size() != 1) { - throw EnvoyException("logical_dns clusters must have a single host"); + resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })), + local_info_(local_info), + load_assignment_(cluster.has_load_assignment() + ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())) { + const auto& locality_lb_endpoints = load_assignment_.endpoints(); + if (locality_lb_endpoints.size() != 1 || locality_lb_endpoints[0].lb_endpoints().size() != 1) { + throw EnvoyException(fmt::format("LOGICAL_DNS clusters must have {}", + cluster.has_load_assignment() + ? "a single locality_lb_endpoint and a single lb_endpoint" + : "a single host")); } switch (cluster.dns_lookup_family()) { @@ -47,7 +55,8 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, NOT_REACHED; } - const auto& socket_address = hosts[0].socket_address(); + const envoy::api::v2::core::SocketAddress& socket_address = + lbEndpoint().endpoint().address().socket_address(); dns_url_ = fmt::format("tcp://{}:{}", socket_address.address(), socket_address.port_value()); hostname_ = Network::Utility::hostFromTcpUrl(dns_url_); Network::Utility::portFromTcpUrl(dns_url_); @@ -88,7 +97,8 @@ void LogicalDnsCluster::startResolve() { current_resolved_address_ = new_address; // Capture URL to avoid a race with another update. tls_->runOnAllThreads([this, new_address]() -> void { - tls_->getTyped().current_resolved_address_ = new_address; + PerThreadCurrentHostData& data = tls_->getTyped(); + data.current_resolved_address_ = new_address; }); } @@ -107,14 +117,16 @@ void LogicalDnsCluster::startResolve() { new LogicalHost(info_, hostname_, Network::Utility::getIpv6AnyAddress(), *this)); break; } - HostVectorSharedPtr new_hosts(new HostVector()); - new_hosts->emplace_back(logical_host_); - // Given the current config, only EDS clusters support multiple priorities. - ASSERT(priority_set_.hostSetsPerPriority().size() == 1); - auto& first_host_set = priority_set_.getOrCreateHostSet(0); - first_host_set.updateHosts(new_hosts, createHealthyHostList(*new_hosts), - HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(), - {}, *new_hosts, {}); + const auto& locality_lb_endpoint = localityLbEndpoint(); + PriorityStateManager priority_state_manager(*this, local_info_); + priority_state_manager.initializePriorityFor(locality_lb_endpoint); + priority_state_manager.registerHostForPriority(logical_host_, locality_lb_endpoint, + lbEndpoint(), absl::nullopt); + + const uint32_t priority = locality_lb_endpoint.priority(); + priority_state_manager.updateClusterPrioritySet( + priority, std::move(priority_state_manager.priorityState()[priority].first), + absl::nullopt, absl::nullopt, absl::nullopt); } } @@ -131,7 +143,8 @@ Upstream::Host::CreateConnectionData LogicalDnsCluster::LogicalHost::createConne return {HostImpl::createConnection(dispatcher, *parent_.info_, data.current_resolved_address_, options), HostDescriptionConstSharedPtr{ - new RealHostDescription(data.current_resolved_address_, shared_from_this())}}; + new RealHostDescription(data.current_resolved_address_, parent_.localityLbEndpoint(), + parent_.lbEndpoint(), shared_from_this())}}; } } // namespace Upstream diff --git a/source/common/upstream/logical_dns_cluster.h b/source/common/upstream/logical_dns_cluster.h index fa0a949554727..2feec15579b78 100644 --- a/source/common/upstream/logical_dns_cluster.h +++ b/source/common/upstream/logical_dns_cluster.h @@ -30,6 +30,7 @@ class LogicalDnsCluster : public ClusterImplBase { public: LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api); @@ -42,9 +43,10 @@ class LogicalDnsCluster : public ClusterImplBase { struct LogicalHost : public HostImpl { LogicalHost(ClusterInfoConstSharedPtr cluster, const std::string& hostname, Network::Address::InstanceConstSharedPtr address, LogicalDnsCluster& parent) - : HostImpl(cluster, hostname, address, envoy::api::v2::core::Metadata::default_instance(), - 1, envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance()), + : HostImpl(cluster, hostname, address, parent.lbEndpoint().metadata(), + parent.lbEndpoint().load_balancing_weight().value(), + parent.localityLbEndpoint().locality(), + parent.lbEndpoint().endpoint().health_check_config()), parent_(parent) {} // Upstream::Host @@ -57,10 +59,17 @@ class LogicalDnsCluster : public ClusterImplBase { struct RealHostDescription : public HostDescription { RealHostDescription(Network::Address::InstanceConstSharedPtr address, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, HostConstSharedPtr logical_host) : address_(address), logical_host_(logical_host), - metadata_(std::make_shared( - envoy::api::v2::core::Metadata::default_instance())) {} + metadata_(std::make_shared(lb_endpoint.metadata())), + health_check_address_( + lb_endpoint.endpoint().health_check_config().port_value() == 0 + ? address + : Network::Utility::getAddressWithPort( + *address, lb_endpoint.endpoint().health_check_config().port_value())), + locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {} // Upstream:HostDescription bool canary() const override { return false; } @@ -81,21 +90,36 @@ class LogicalDnsCluster : public ClusterImplBase { const std::string& hostname() const override { return logical_host_->hostname(); } Network::Address::InstanceConstSharedPtr address() const override { return address_; } const envoy::api::v2::core::Locality& locality() const override { - return envoy::api::v2::core::Locality().default_instance(); + return locality_lb_endpoint_.locality(); } - // TODO(dio): To support different address port. Network::Address::InstanceConstSharedPtr healthCheckAddress() const override { - return address_; + return health_check_address_; } + uint32_t priority() const { return locality_lb_endpoint_.priority(); } Network::Address::InstanceConstSharedPtr address_; HostConstSharedPtr logical_host_; const std::shared_ptr metadata_; + Network::Address::InstanceConstSharedPtr health_check_address_; + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint_; + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint_; }; struct PerThreadCurrentHostData : public ThreadLocal::ThreadLocalObject { Network::Address::InstanceConstSharedPtr current_resolved_address_; }; + const envoy::api::v2::endpoint::LocalityLbEndpoints& localityLbEndpoint() const { + // This is checked in the constructor, i.e. at config load time. + ASSERT(load_assignment_.endpoints().size() == 1); + return load_assignment_.endpoints()[0]; + } + + const envoy::api::v2::endpoint::LbEndpoint& lbEndpoint() const { + // This is checked in the constructor, i.e. at config load time. + ASSERT(localityLbEndpoint().lb_endpoints().size() == 1); + return localityLbEndpoint().lb_endpoints()[0]; + } + void startResolve(); // ClusterImplBase @@ -111,6 +135,8 @@ class LogicalDnsCluster : public ClusterImplBase { Network::Address::InstanceConstSharedPtr current_resolved_address_; HostSharedPtr logical_host_; Network::ActiveDnsQuery* active_dns_query_{}; + const LocalInfo::LocalInfo& local_info_; + const envoy::api::v2::ClusterLoadAssignment load_assignment_; }; } // namespace Upstream diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 5a80405a20572..8e0a7de4f46cf 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -382,17 +382,17 @@ ClusterSharedPtr ClusterImplBase::create( switch (cluster.type()) { case envoy::api::v2::Cluster::STATIC: - new_cluster.reset( - new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager, cm, added_via_api)); + new_cluster.reset(new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager, + local_info, cm, added_via_api)); break; case envoy::api::v2::Cluster::STRICT_DNS: new_cluster.reset(new StrictDnsClusterImpl(cluster, runtime, stats, ssl_context_manager, - selected_dns_resolver, cm, dispatcher, + local_info, selected_dns_resolver, cm, dispatcher, added_via_api)); break; case envoy::api::v2::Cluster::LOGICAL_DNS: new_cluster.reset(new LogicalDnsCluster(cluster, runtime, stats, ssl_context_manager, - selected_dns_resolver, tls, cm, dispatcher, + local_info, selected_dns_resolver, tls, cm, dispatcher, added_via_api)); break; case envoy::api::v2::Cluster::ORIGINAL_DST: @@ -666,27 +666,37 @@ void PriorityStateManager::registerHostForPriority( const std::string& hostname, Network::Address::InstanceConstSharedPtr address, const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, - const Upstream::Host::HealthFlag health_checker_flag) { + const absl::optional health_checker_flag) { + const HostSharedPtr host(new HostImpl(parent_.info(), hostname, address, lb_endpoint.metadata(), + lb_endpoint.load_balancing_weight().value(), + locality_lb_endpoint.locality(), + lb_endpoint.endpoint().health_check_config())); + registerHostForPriority(host, locality_lb_endpoint, lb_endpoint, health_checker_flag); +} + +void PriorityStateManager::registerHostForPriority( + const HostSharedPtr& host, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, + const absl::optional health_checker_flag) { const uint32_t priority = locality_lb_endpoint.priority(); // Should be called after initializePriorityFor. ASSERT(priority_state_[priority].first); - priority_state_[priority].first->emplace_back( - new HostImpl(parent_.info(), hostname, address, lb_endpoint.metadata(), - lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(), - lb_endpoint.endpoint().health_check_config())); - - const auto& health_status = lb_endpoint.health_status(); - if (health_status == envoy::api::v2::core::HealthStatus::UNHEALTHY || - health_status == envoy::api::v2::core::HealthStatus::DRAINING || - health_status == envoy::api::v2::core::HealthStatus::TIMEOUT) { - priority_state_[priority].first->back()->healthFlagSet(health_checker_flag); + priority_state_[priority].first->emplace_back(host); + if (health_checker_flag.has_value()) { + const auto& health_status = lb_endpoint.health_status(); + if (health_status == envoy::api::v2::core::HealthStatus::UNHEALTHY || + health_status == envoy::api::v2::core::HealthStatus::DRAINING || + health_status == envoy::api::v2::core::HealthStatus::TIMEOUT) { + priority_state_[priority].first->back()->healthFlagSet(health_checker_flag.value()); + } } } void PriorityStateManager::updateClusterPrioritySet( const uint32_t priority, HostVectorSharedPtr&& current_hosts, - const absl::optional& hosts_added, - const absl::optional& hosts_removed) { + const absl::optional& hosts_added, const absl::optional& hosts_removed, + const absl::optional health_checker_flag) { // If local locality is not defined then skip populating per locality hosts. const auto& local_locality = local_info_node_.locality(); ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString()); @@ -710,9 +720,12 @@ void PriorityStateManager::updateClusterPrioritySet( std::map hosts_per_locality; for (const HostSharedPtr& host : *hosts) { - // TODO(dio): Take into consideration when a non-EDS cluster has active health checking, i.e. to - // mark all the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and - // then fire update callbacks to start the health checking process. + // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all + // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire + // update callbacks to start the health checking process. + if (health_checker_flag.has_value()) { + host->healthFlagSet(health_checker_flag.value()); + } hosts_per_locality[host->locality()].push_back(host); } @@ -754,36 +767,41 @@ void PriorityStateManager::updateClusterPrioritySet( StaticClusterImpl::StaticClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, ClusterManager& cm, + Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, ClusterManager& cm, bool added_via_api) : ClusterImplBase(cluster, cm.bindConfig(), runtime, stats, ssl_context_manager, cm.clusterManagerFactory().secretManager(), added_via_api), - initial_hosts_(new HostVector()) { - - for (const auto& host : cluster.hosts()) { - initial_hosts_->emplace_back(HostSharedPtr{new HostImpl( - info_, "", resolveProtoAddress(host), envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())}); + priority_state_manager_(new PriorityStateManager(*this, local_info)) { + // TODO(dio): Use by-reference when cluster.hosts() is removed. + const envoy::api::v2::ClusterLoadAssignment cluster_load_assignment( + cluster.has_load_assignment() ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())); + + for (const auto& locality_lb_endpoint : cluster_load_assignment.endpoints()) { + priority_state_manager_->initializePriorityFor(locality_lb_endpoint); + for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { + priority_state_manager_->registerHostForPriority( + "", resolveProtoAddress(lb_endpoint.endpoint().address()), locality_lb_endpoint, + lb_endpoint, absl::nullopt); + } } } void StaticClusterImpl::startPreInit() { - // At this point see if we have a health checker. If so, mark all the hosts unhealthy and then - // fire update callbacks to start the health checking process. - if (health_checker_) { - for (const auto& host : *initial_hosts_) { - host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); - } + // At this point see if we have a health checker. If so, mark all the hosts unhealthy and + // then fire update callbacks to start the health checking process. + const auto& health_checker_flag = + health_checker_ != nullptr + ? absl::optional(Host::HealthFlag::FAILED_ACTIVE_HC) + : absl::nullopt; + + auto& priority_state = priority_state_manager_->priorityState(); + for (size_t i = 0; i < priority_state.size(); ++i) { + priority_state_manager_->updateClusterPrioritySet( + i, std::move(priority_state[i].first), absl::nullopt, absl::nullopt, health_checker_flag); } - - // Given the current config, only EDS clusters support multiple priorities. - ASSERT(priority_set_.hostSetsPerPriority().size() == 1); - auto& first_host_set = priority_set_.getOrCreateHostSet(0); - first_host_set.updateHosts(initial_hosts_, createHealthyHostList(*initial_hosts_), - HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(), {}, - *initial_hosts_, {}); - initial_hosts_ = nullptr; + priority_state_manager_.reset(); onPreInitComplete(); } @@ -934,12 +952,13 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts, StrictDnsClusterImpl::StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api) : BaseDynamicClusterImpl(cluster, cm.bindConfig(), runtime, stats, ssl_context_manager, cm.clusterManagerFactory().secretManager(), added_via_api), - dns_resolver_(dns_resolver), + local_info_(local_info), dns_resolver_(dns_resolver), dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))) { switch (cluster.dns_lookup_family()) { @@ -956,11 +975,18 @@ StrictDnsClusterImpl::StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluste NOT_REACHED; } - for (const auto& host : cluster.hosts()) { - resolve_targets_.emplace_back( - new ResolveTarget(*this, dispatcher, - fmt::format("tcp://{}:{}", host.socket_address().address(), - host.socket_address().port_value()))); + const envoy::api::v2::ClusterLoadAssignment load_assignment( + cluster.has_load_assignment() ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())); + const auto& locality_lb_endpoints = load_assignment.endpoints(); + for (const auto& locality_lb_endpoint : locality_lb_endpoints) { + for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { + const auto& host = lb_endpoint.endpoint().address(); + const std::string& url = fmt::format("tcp://{}:{}", host.socket_address().address(), + host.socket_address().port_value()); + resolve_targets_.emplace_back( + new ResolveTarget(*this, dispatcher, url, locality_lb_endpoint, lb_endpoint)); + } } } @@ -971,29 +997,34 @@ void StrictDnsClusterImpl::startPreInit() { } void StrictDnsClusterImpl::updateAllHosts(const HostVector& hosts_added, - const HostVector& hosts_removed) { + const HostVector& hosts_removed, + uint32_t current_priority) { + PriorityStateManager priority_state_manager(*this, local_info_); // At this point we know that we are different so make a new host list and notify. - HostVectorSharedPtr new_hosts(new HostVector()); for (const ResolveTargetPtr& target : resolve_targets_) { + priority_state_manager.initializePriorityFor(target->locality_lb_endpoint_); for (const HostSharedPtr& host : target->hosts_) { - new_hosts->emplace_back(host); + if (target->locality_lb_endpoint_.priority() == current_priority) { + priority_state_manager.registerHostForPriority(host, target->locality_lb_endpoint_, + target->lb_endpoint_, absl::nullopt); + } } } - // Given the current config, only EDS clusters support multiple priorities. - ASSERT(priority_set_.hostSetsPerPriority().size() == 1); - auto& first_host_set = priority_set_.getOrCreateHostSet(0); - first_host_set.updateHosts(new_hosts, createHealthyHostList(*new_hosts), - HostsPerLocalityImpl::empty(), HostsPerLocalityImpl::empty(), {}, - hosts_added, hosts_removed); + // TODO(dio): Add assertion in here. + priority_state_manager.updateClusterPrioritySet( + current_priority, std::move(priority_state_manager.priorityState()[current_priority].first), + hosts_added, hosts_removed, absl::nullopt); } -StrictDnsClusterImpl::ResolveTarget::ResolveTarget(StrictDnsClusterImpl& parent, - Event::Dispatcher& dispatcher, - const std::string& url) +StrictDnsClusterImpl::ResolveTarget::ResolveTarget( + StrictDnsClusterImpl& parent, Event::Dispatcher& dispatcher, const std::string& url, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint) : parent_(parent), dns_address_(Network::Utility::hostFromTcpUrl(url)), port_(Network::Utility::portFromTcpUrl(url)), - resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })) {} + resolve_timer_(dispatcher.createTimer([this]() -> void { startResolve(); })), + locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {} StrictDnsClusterImpl::ResolveTarget::~ResolveTarget() { if (active_query_) { @@ -1014,28 +1045,28 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { HostVector new_hosts; for (const Network::Address::InstanceConstSharedPtr& address : address_list) { - // TODO(mattklein123): Currently the DNS interface does not consider port. We need to make - // a new address that has port in it. We need to both support IPv6 as well as potentially - // move port handling into the DNS interface itself, which would work better for SRV. + // TODO(mattklein123): Currently the DNS interface does not consider port. We need to + // make a new address that has port in it. We need to both support IPv6 as well as + // potentially move port handling into the DNS interface itself, which would work better + // for SRV. ASSERT(address != nullptr); new_hosts.emplace_back(new HostImpl( parent_.info_, dns_address_, Network::Utility::getAddressWithPort(*address, port_), - envoy::api::v2::core::Metadata::default_instance(), 1, - envoy::api::v2::core::Locality().default_instance(), - envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + lb_endpoint_.metadata(), lb_endpoint_.load_balancing_weight().value(), + locality_lb_endpoint_.locality(), lb_endpoint_.endpoint().health_check_config())); } HostVector hosts_added; HostVector hosts_removed; if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed)) { ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_); - parent_.updateAllHosts(hosts_added, hosts_removed); + parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoint_.priority()); } // If there is an initialize callback, fire it now. Note that if the cluster refers to - // multiple DNS names, this will return initialized after a single DNS resolution completes. - // This is not perfect but is easier to code and unclear if the extra complexity is needed - // so will start with this. + // multiple DNS names, this will return initialized after a single DNS resolution + // completes. This is not perfect but is easier to code and unclear if the extra + // complexity is needed so will start with this. parent_.onPreInitComplete(); resolve_timer_->enableTimer(parent_.dns_refresh_rate_ms_); }); diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index f5ba7e83794ca..90b7935d55d44 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -541,18 +541,19 @@ class PriorityStateManager : protected Logger::Loggable { Network::Address::InstanceConstSharedPtr address, const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, - const Upstream::Host::HealthFlag health_checker_flag); + const absl::optional health_checker_flag); - // TODO(dio): Add an override of registerHostForPriority to register a host to the PriorityState - // based on a specified priority. This will be useful for non-EDS cluster hosts setup. - // - // void registerHostForPriority(const HostSharedPtr& host, const uint32_t priority); + void + registerHostForPriority(const HostSharedPtr& host, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint, + const absl::optional health_checker_flag); - // Updates the cluster priority set. This should be called after the PriorityStateManager is - // initialized. - void updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts, - const absl::optional& hosts_added, - const absl::optional& hosts_removed); + void + updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts, + const absl::optional& hosts_added, + const absl::optional& hosts_removed, + const absl::optional health_checker_flag); // Returns the size of the current cluster priority state. size_t size() const { return priority_state_.size(); } @@ -566,6 +567,8 @@ class PriorityStateManager : protected Logger::Loggable { const envoy::api::v2::core::Node& local_info_node_; }; +typedef std::unique_ptr PriorityStateManagerPtr; + /** * Implementation of Upstream::Cluster for static clusters (clusters that have a fixed number of * hosts with resolved IP addresses). @@ -574,7 +577,7 @@ class StaticClusterImpl : public ClusterImplBase { public: StaticClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, - ClusterManager& cm, bool added_via_api); + const LocalInfo::LocalInfo& local_info, ClusterManager& cm, bool added_via_api); // Upstream::Cluster InitializePhase initializePhase() const override { return InitializePhase::Primary; } @@ -583,7 +586,7 @@ class StaticClusterImpl : public ClusterImplBase { // ClusterImplBase void startPreInit() override; - HostVectorSharedPtr initial_hosts_; + PriorityStateManagerPtr priority_state_manager_; }; /** @@ -605,6 +608,7 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { public: StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + const LocalInfo::LocalInfo& local_info, Network::DnsResolverSharedPtr dns_resolver, ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api); @@ -614,7 +618,9 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { private: struct ResolveTarget { ResolveTarget(StrictDnsClusterImpl& parent, Event::Dispatcher& dispatcher, - const std::string& url); + const std::string& url, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint); ~ResolveTarget(); void startResolve(); @@ -624,15 +630,19 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl { uint32_t port_; Event::TimerPtr resolve_timer_; HostVector hosts_; + const envoy::api::v2::endpoint::LocalityLbEndpoints locality_lb_endpoint_; + const envoy::api::v2::endpoint::LbEndpoint lb_endpoint_; }; typedef std::unique_ptr ResolveTargetPtr; - void updateAllHosts(const HostVector& hosts_added, const HostVector& hosts_removed); + void updateAllHosts(const HostVector& hosts_added, const HostVector& hosts_removed, + uint32_t priority); // ClusterImplBase void startPreInit() override; + const LocalInfo::LocalInfo& local_info_; Network::DnsResolverSharedPtr dns_resolver_; std::list resolve_targets_; const std::chrono::milliseconds dns_refresh_rate_ms_; diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index 067ad5734c4bd..441ce2437613f 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -172,6 +172,7 @@ envoy_cc_test( "//source/common/upstream:upstream_lib", "//source/extensions/transport_sockets/raw_buffer:config", "//test/mocks:common_lib", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/network:network_mocks", "//test/mocks/runtime:runtime_mocks", "//test/mocks/ssl:ssl_mocks", @@ -326,6 +327,7 @@ envoy_cc_test( "//source/common/upstream:upstream_lib", "//source/extensions/transport_sockets/raw_buffer:config", "//test/mocks:common_lib", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/network:network_mocks", "//test/mocks/runtime:runtime_mocks", "//test/mocks/ssl:ssl_mocks", diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 5afbfac84911a..8082c0c090c0a 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -122,7 +122,7 @@ class TestClusterManagerFactory : public ClusterManagerFactory { NiceMock random_; Ssl::ContextManagerImpl ssl_context_manager_{runtime_}; NiceMock dispatcher_; - LocalInfo::MockLocalInfo local_info_; + NiceMock local_info_; Secret::MockSecretManager secret_manager_; }; diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index df74b6f1aba4f..80a5c7b705fc5 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -9,6 +9,7 @@ #include "test/common/upstream/utility.h" #include "test/mocks/common.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/ssl/mocks.h" @@ -26,14 +27,16 @@ using testing::_; namespace Envoy { namespace Upstream { +enum class ConfigType { V2_YAML, V1_JSON }; + class LogicalDnsClusterTest : public testing::Test { public: - void setup(const std::string& json) { + void setupFromV1Json(const std::string& json) { resolve_timer_ = new Event::MockTimer(&dispatcher_); NiceMock cm; cluster_.reset(new LogicalDnsCluster(parseClusterFromJson(json), runtime_, stats_store_, - ssl_context_manager_, dns_resolver_, tls_, cm, dispatcher_, - false)); + ssl_context_manager_, local_info_, dns_resolver_, tls_, cm, + dispatcher_, false)); cluster_->prioritySet().addMemberUpdateCb( [&](uint32_t, const HostVector&, const HostVector&) -> void { membership_updated_.ready(); @@ -41,8 +44,22 @@ class LogicalDnsClusterTest : public testing::Test { cluster_->initialize([&]() -> void { initialized_.ready(); }); } - void expectResolve(Network::DnsLookupFamily dns_lookup_family) { - EXPECT_CALL(*dns_resolver_, resolve("foo.bar.com", dns_lookup_family, _)) + void setupFromV2Yaml(const std::string& yaml) { + resolve_timer_ = new Event::MockTimer(&dispatcher_); + NiceMock cm; + cluster_.reset(new LogicalDnsCluster(parseClusterFromV2Yaml(yaml), runtime_, stats_store_, + ssl_context_manager_, local_info_, dns_resolver_, tls_, cm, + dispatcher_, false)); + cluster_->prioritySet().addMemberUpdateCb( + [&](uint32_t, const HostVector&, const HostVector&) -> void { + membership_updated_.ready(); + }); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + } + + void expectResolve(Network::DnsLookupFamily dns_lookup_family, + const std::string& expected_address) { + EXPECT_CALL(*dns_resolver_, resolve(expected_address, dns_lookup_family, _)) .WillOnce(Invoke([&](const std::string&, Network::DnsLookupFamily, Network::DnsResolver::ResolveCb cb) -> Network::ActiveDnsQuery* { dns_callback_ = cb; @@ -50,6 +67,102 @@ class LogicalDnsClusterTest : public testing::Test { })); } + void testBasicSetup(const std::string& config, const std::string& expected_address, + ConfigType config_type = ConfigType::V2_YAML) { + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + if (config_type == ConfigType::V1_JSON) { + setupFromV1Json(config); + } else { + setupFromV2Yaml(config); + } + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + EXPECT_CALL(*resolve_timer_, enableTimer(std::chrono::milliseconds(4000))); + dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, + cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ( + 1UL, + cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + EXPECT_EQ(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0], + cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts()[0]); + HostSharedPtr logical_host = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]; + + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + logical_host->createConnection(dispatcher_, nullptr); + logical_host->outlierDetector().putHttpResponseCode(200); + + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + // Should not cause any changes. + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2", "127.0.0.3"})); + + EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + Host::CreateConnectionData data = logical_host->createConnection(dispatcher_, nullptr); + EXPECT_FALSE(data.host_description_->canary()); + EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->cluster(), + &data.host_description_->cluster()); + EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->stats(), + &data.host_description_->stats()); + EXPECT_EQ("127.0.0.1:443", data.host_description_->address()->asString()); + EXPECT_EQ("", data.host_description_->locality().region()); + EXPECT_EQ("", data.host_description_->locality().zone()); + EXPECT_EQ("", data.host_description_->locality().sub_zone()); + EXPECT_EQ("foo.bar.com", data.host_description_->hostname()); + EXPECT_TRUE(TestUtility::protoEqual(envoy::api::v2::core::Metadata::default_instance(), + *data.host_description_->metadata())); + data.host_description_->outlierDetector().putHttpResponseCode(200); + data.host_description_->healthChecker().setUnhealthy(); + + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + // Should cause a change. + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3", "127.0.0.1", "127.0.0.2"})); + + EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + logical_host->createConnection(dispatcher_, nullptr); + + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + // Empty should not cause any change. + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + dns_callback_({}); + + EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); + EXPECT_CALL(dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) + .WillOnce(Return(new NiceMock())); + logical_host->createConnection(dispatcher_, nullptr); + + // Make sure we cancel. + EXPECT_CALL(active_dns_query_, cancel()); + expectResolve(Network::DnsLookupFamily::V4Only, expected_address); + resolve_timer_->callback_(); + + tls_.shutdownThread(); + } + Stats::IsolatedStoreImpl stats_store_; Ssl::MockContextManager ssl_context_manager_; std::shared_ptr> dns_resolver_{ @@ -63,6 +176,7 @@ class LogicalDnsClusterTest : public testing::Test { ReadyWatcher initialized_; NiceMock runtime_; NiceMock dispatcher_; + NiceMock local_info_; }; typedef std::tuple> @@ -127,7 +241,7 @@ TEST_P(LogicalDnsParamTest, ImmediateResolve) { cb(TestUtility::makeDnsResponse(std::get<2>(GetParam()))); return nullptr; })); - setup(json); + setupFromV1Json(json); EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); EXPECT_EQ("foo.bar.com", @@ -137,7 +251,7 @@ TEST_P(LogicalDnsParamTest, ImmediateResolve) { } TEST_F(LogicalDnsClusterTest, BadConfig) { - const std::string json = R"EOF( + const std::string multiple_hosts_json = R"EOF( { "name": "name", "connect_timeout_ms": 250, @@ -147,7 +261,72 @@ TEST_F(LogicalDnsClusterTest, BadConfig) { } )EOF"; - EXPECT_THROW(setup(json), EnvoyException); + EXPECT_THROW_WITH_MESSAGE(setupFromV1Json(multiple_hosts_json), EnvoyException, + "LOGICAL_DNS clusters must have a single host"); + + const std::string multiple_lb_endpoints_yaml = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 443 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: hello.world.com + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + EXPECT_THROW_WITH_MESSAGE( + setupFromV2Yaml(multiple_lb_endpoints_yaml), EnvoyException, + "LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint"); + + const std::string multiple_endpoints_yaml = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 443 + health_check_config: + port_value: 8000 + + - lb_endpoints: + - endpoint: + address: + socket_address: + address: hello.world.com + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + EXPECT_THROW_WITH_MESSAGE( + setupFromV2Yaml(multiple_endpoints_yaml), EnvoyException, + "LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint"); } TEST_F(LogicalDnsClusterTest, Basic) { @@ -162,93 +341,46 @@ TEST_F(LogicalDnsClusterTest, Basic) { } )EOF"; - expectResolve(Network::DnsLookupFamily::V4Only); - setup(json); - - EXPECT_CALL(membership_updated_, ready()); - EXPECT_CALL(initialized_, ready()); - EXPECT_CALL(*resolve_timer_, enableTimer(std::chrono::milliseconds(4000))); - dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + const std::string basic_yaml_hosts = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + # Since the following expectResolve() requires Network::DnsLookupFamily::V4Only we need to set + # dns_lookup_family to V4_ONLY explicitly for v2 .yaml config. + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 443 + )EOF"; - EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); - EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); - EXPECT_EQ(0UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); - EXPECT_EQ( - 0UL, - cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); - EXPECT_EQ(cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0], - cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts()[0]); - HostSharedPtr logical_host = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]; - - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - logical_host->createConnection(dispatcher_, nullptr); - logical_host->outlierDetector().putHttpResponseCode(200); - - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); - - // Should not cause any changes. - EXPECT_CALL(*resolve_timer_, enableTimer(_)); - dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2", "127.0.0.3"})); - - EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - Host::CreateConnectionData data = logical_host->createConnection(dispatcher_, nullptr); - EXPECT_FALSE(data.host_description_->canary()); - EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->cluster(), - &data.host_description_->cluster()); - EXPECT_EQ(&cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]->stats(), - &data.host_description_->stats()); - EXPECT_EQ("127.0.0.1:443", data.host_description_->address()->asString()); - EXPECT_EQ("", data.host_description_->locality().region()); - EXPECT_EQ("", data.host_description_->locality().zone()); - EXPECT_EQ("", data.host_description_->locality().sub_zone()); - EXPECT_EQ("foo.bar.com", data.host_description_->hostname()); - EXPECT_TRUE(TestUtility::protoEqual(envoy::api::v2::core::Metadata::default_instance(), - *data.host_description_->metadata())); - data.host_description_->outlierDetector().putHttpResponseCode(200); - data.host_description_->healthChecker().setUnhealthy(); - - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); - - // Should cause a change. - EXPECT_CALL(*resolve_timer_, enableTimer(_)); - dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3", "127.0.0.1", "127.0.0.2"})); - - EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - logical_host->createConnection(dispatcher_, nullptr); - - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); - - // Empty should not cause any change. - EXPECT_CALL(*resolve_timer_, enableTimer(_)); - dns_callback_({}); - - EXPECT_EQ(logical_host, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()[0]); - EXPECT_CALL(dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443")), _, _, _)) - .WillOnce(Return(new NiceMock())); - logical_host->createConnection(dispatcher_, nullptr); - - // Make sure we cancel. - EXPECT_CALL(active_dns_query_, cancel()); - expectResolve(Network::DnsLookupFamily::V4Only); - resolve_timer_->callback_(); + const std::string basic_yaml_load_assignment = R"EOF( + name: name + type: LOGICAL_DNS + dns_refresh_rate: 4s + connect_timeout: 0.25s + lb_policy: ROUND_ROBIN + # Since the following expectResolve() requires Network::DnsLookupFamily::V4Only we need to set + # dns_lookup_family to V4_ONLY explicitly for v2 .yaml config. + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; - tls_.shutdownThread(); + testBasicSetup(json, "foo.bar.com", ConfigType::V1_JSON); + testBasicSetup(basic_yaml_hosts, "foo.bar.com"); + testBasicSetup(basic_yaml_load_assignment, "foo.bar.com"); } } // namespace Upstream diff --git a/test/common/upstream/upstream_impl_test.cc b/test/common/upstream/upstream_impl_test.cc index 17888a9006304..f4b43c393d2d6 100644 --- a/test/common/upstream/upstream_impl_test.cc +++ b/test/common/upstream/upstream_impl_test.cc @@ -17,6 +17,7 @@ #include "test/common/upstream/utility.h" #include "test/mocks/common.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/ssl/mocks.h" @@ -119,6 +120,7 @@ TEST_P(StrictDnsParamTest, ImmediateResolve) { auto dns_resolver = std::make_shared>(); NiceMock dispatcher; NiceMock runtime; + NiceMock local_info; ReadyWatcher initialized; const std::string json = R"EOF( @@ -141,7 +143,7 @@ TEST_P(StrictDnsParamTest, ImmediateResolve) { })); NiceMock cm; StrictDnsClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); cluster.initialize([&]() -> void { initialized.ready(); }); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->hosts().size()); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); @@ -155,6 +157,7 @@ TEST(StrictDnsClusterImplTest, ZeroHostsHealthChecker) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; ReadyWatcher initialized; const std::string yaml = R"EOF( @@ -167,7 +170,7 @@ TEST(StrictDnsClusterImplTest, ZeroHostsHealthChecker) { ResolverData resolver(*dns_resolver, dispatcher); StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); std::shared_ptr health_checker(new MockHealthChecker()); EXPECT_CALL(*health_checker, start()); EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)); @@ -188,6 +191,7 @@ TEST(StrictDnsClusterImplTest, Basic) { auto dns_resolver = std::make_shared>(); NiceMock dispatcher; NiceMock runtime; + NiceMock local_info; // gmock matches in LIFO order which is why these are swapped. ResolverData resolver2(*dns_resolver, dispatcher); @@ -225,7 +229,7 @@ TEST(StrictDnsClusterImplTest, Basic) { NiceMock cm; StrictDnsClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_connections", 43)); EXPECT_EQ(43U, cluster.info()->resourceManager(ResourcePriority::Default).connections().max()); EXPECT_CALL(runtime.snapshot_, @@ -302,8 +306,8 @@ TEST(StrictDnsClusterImplTest, Basic) { ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); - EXPECT_EQ(0UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); - EXPECT_EQ(0UL, + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); for (const HostSharedPtr& host : cluster.prioritySet().hostSetsPerPriority()[0]->hosts()) { @@ -329,6 +333,7 @@ TEST(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; const std::string yaml = R"EOF( name: name @@ -341,7 +346,7 @@ TEST(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { ResolverData resolver(*dns_resolver, dispatcher); StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); std::shared_ptr health_checker(new MockHealthChecker()); EXPECT_CALL(*health_checker, start()); EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)); @@ -372,6 +377,298 @@ TEST(StrictDnsClusterImplTest, HostRemovalActiveHealthSkipped) { EXPECT_EQ(1UL, hosts.size()); } +TEST(StrictDnsClusterImplTest, LoadAssignmentBasic) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + auto dns_resolver = std::make_shared>(); + NiceMock dispatcher; + NiceMock runtime; + NiceMock local_info; + + // gmock matches in LIFO order which is why these are swapped. + ResolverData resolver2(*dns_resolver, dispatcher); + ResolverData resolver1(*dns_resolver, dispatcher); + + const std::string yaml = R"EOF( + name: name + type: STRICT_DNS + + dns_lookup_family: V4_ONLY + connect_timeout: 0.25s + dns_refresh_rate: 4s + + lb_policy: ROUND_ROBIN + + circuit_breakers: + thresholds: + - priority: DEFAULT + max_connections: 43 + max_pending_requests: 57 + max_requests: 50 + max_retries: 10 + - priority: HIGH + max_connections: 1 + max_pending_requests: 2 + max_requests: 3 + max_retries: 4 + + max_requests_per_connection: 3 + + http2_protocol_options: + hpack_table_size: 0 + + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost1 + port_value: 11001 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: localhost2 + port_value: 11002 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, dns_resolver, cm, dispatcher, false); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_connections", 43)); + EXPECT_EQ(43U, cluster.info()->resourceManager(ResourcePriority::Default).connections().max()); + EXPECT_CALL(runtime.snapshot_, + getInteger("circuit_breakers.name.default.max_pending_requests", 57)); + EXPECT_EQ(57U, + cluster.info()->resourceManager(ResourcePriority::Default).pendingRequests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_requests", 50)); + EXPECT_EQ(50U, cluster.info()->resourceManager(ResourcePriority::Default).requests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.default.max_retries", 10)); + EXPECT_EQ(10U, cluster.info()->resourceManager(ResourcePriority::Default).retries().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_connections", 1)); + EXPECT_EQ(1U, cluster.info()->resourceManager(ResourcePriority::High).connections().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_pending_requests", 2)); + EXPECT_EQ(2U, cluster.info()->resourceManager(ResourcePriority::High).pendingRequests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_requests", 3)); + EXPECT_EQ(3U, cluster.info()->resourceManager(ResourcePriority::High).requests().max()); + EXPECT_CALL(runtime.snapshot_, getInteger("circuit_breakers.name.high.max_retries", 4)); + EXPECT_EQ(4U, cluster.info()->resourceManager(ResourcePriority::High).retries().max()); + EXPECT_EQ(3U, cluster.info()->maxRequestsPerConnection()); + EXPECT_EQ(0U, cluster.info()->http2Settings().hpack_table_size_); + + cluster.info()->stats().upstream_rq_total_.inc(); + EXPECT_EQ(1UL, stats.counter("cluster.name.upstream_rq_total").value()); + + EXPECT_CALL(runtime.snapshot_, featureEnabled("upstream.maintenance_mode.name", 0)); + EXPECT_FALSE(cluster.info()->maintenanceMode()); + + ReadyWatcher membership_updated; + cluster.prioritySet().addMemberUpdateCb( + [&](uint32_t, const HostVector&, const HostVector&) -> void { membership_updated.ready(); }); + + cluster.initialize([] {}); + + resolver1.expectResolve(*dns_resolver); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[1]->hostname()); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + // Make sure we de-dup the same address. + EXPECT_CALL(*resolver2.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver2.dns_callback_(TestUtility::makeDnsResponse({"10.0.0.1", "10.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001", "10.0.0.1:11002"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, + cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + + for (const HostSharedPtr& host : cluster.prioritySet().hostSetsPerPriority()[0]->hosts()) { + EXPECT_EQ(cluster.info().get(), &host->cluster()); + } + + // Make sure we cancel. + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + resolver2.expectResolve(*dns_resolver); + resolver2.timer_->callback_(); + + EXPECT_CALL(resolver1.active_dns_query_, cancel()); + EXPECT_CALL(resolver2.active_dns_query_, cancel()); +} + +TEST(StrictDnsClusterImplTest, LoadAssignmentBasicMultiplePriorities) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + auto dns_resolver = std::make_shared>(); + NiceMock dispatcher; + NiceMock runtime; + NiceMock local_info; + + // gmock matches in LIFO order which is why these are swapped. + ResolverData resolver3(*dns_resolver, dispatcher); + ResolverData resolver2(*dns_resolver, dispatcher); + ResolverData resolver1(*dns_resolver, dispatcher); + + const std::string yaml = R"EOF( + name: name + type: STRICT_DNS + + dns_lookup_family: V4_ONLY + connect_timeout: 0.25s + dns_refresh_rate: 4s + + lb_policy: ROUND_ROBIN + + load_assignment: + endpoints: + - priority: 0 + lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost1 + port_value: 11001 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: localhost2 + port_value: 11002 + health_check_config: + port_value: 8000 + + - priority: 1 + lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost3 + port_value: 11003 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, dns_resolver, cm, dispatcher, false); + + ReadyWatcher membership_updated; + cluster.prioritySet().addMemberUpdateCb( + [&](uint32_t, const HostVector&, const HostVector&) -> void { membership_updated.ready(); }); + + cluster.initialize([] {}); + + resolver1.expectResolve(*dns_resolver); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.1", "127.0.0.2"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_EQ("localhost1", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[1]->hostname()); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.1:11001", "127.0.0.2:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + resolver1.timer_->callback_(); + EXPECT_CALL(*resolver1.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver1.dns_callback_(TestUtility::makeDnsResponse({"127.0.0.3"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + // Make sure we de-dup the same address. + EXPECT_CALL(*resolver2.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver2.dns_callback_(TestUtility::makeDnsResponse({"10.0.0.1", "10.0.0.1"})); + EXPECT_THAT( + std::list({"127.0.0.3:11001", "10.0.0.1:11002"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); + + EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, + cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + + for (const HostSharedPtr& host : cluster.prioritySet().hostSetsPerPriority()[0]->hosts()) { + EXPECT_EQ(cluster.info().get(), &host->cluster()); + } + + EXPECT_CALL(*resolver3.timer_, enableTimer(std::chrono::milliseconds(4000))); + EXPECT_CALL(membership_updated, ready()); + resolver3.dns_callback_(TestUtility::makeDnsResponse({"192.168.1.1", "192.168.1.2"})); + + // Make sure we have multiple priorities. + EXPECT_THAT( + std::list({"192.168.1.1:11003", "192.168.1.2:11003"}), + ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[1]->hosts()))); + + // Make sure we cancel. + resolver1.expectResolve(*dns_resolver); + resolver1.timer_->callback_(); + resolver2.expectResolve(*dns_resolver); + resolver2.timer_->callback_(); + resolver3.expectResolve(*dns_resolver); + resolver3.timer_->callback_(); + + EXPECT_CALL(resolver1.active_dns_query_, cancel()); + EXPECT_CALL(resolver2.active_dns_query_, cancel()); + EXPECT_CALL(resolver3.active_dns_query_, cancel()); +} + TEST(HostImplTest, HostCluster) { MockCluster cluster; HostSharedPtr host = makeTestHost(cluster.info_, "tcp://10.0.0.1:1234", 1); @@ -419,10 +716,38 @@ TEST(HostImplTest, HostnameCanaryAndLocality) { EXPECT_EQ("world", host.locality().sub_zone()); } +TEST(StaticClusterImplTest, InitialHosts) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + hosts: + - socket_address: + address: 10.0.0.1 + port_value: 443 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ("", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + TEST(StaticClusterImplTest, EmptyHostname) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "staticcluster", @@ -434,8 +759,41 @@ TEST(StaticClusterImplTest, EmptyHostname) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ("", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + +TEST(StaticClusterImplTest, LoadAssignmentEmptyHostname) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.1 + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); cluster.initialize([] {}); EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); @@ -443,10 +801,114 @@ TEST(StaticClusterImplTest, EmptyHostname) { EXPECT_FALSE(cluster.info()->addedViaApi()); } +TEST(StaticClusterImplTest, LoadAssignmentMultiplePriorities) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - priority: 0 + lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.1 + port_value: 443 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: 10.0.0.2 + port_value: 443 + health_check_config: + port_value: 8000 + + - priority: 1 + lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.3 + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[1]->healthyHosts().size()); + EXPECT_EQ("", cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->hostname()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + +TEST(StaticClusterImplTest, LoadAssignmentLocality) { + Stats::IsolatedStoreImpl stats; + Ssl::MockContextManager ssl_context_manager; + NiceMock runtime; + NiceMock local_info; + + const std::string yaml = R"EOF( + name: staticcluster + connect_timeout: 0.25s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + endpoints: + - locality: + region: oceania + zone: hello + sub_zone: world + lb_endpoints: + - endpoint: + address: + socket_address: + address: 10.0.0.1 + port_value: 443 + health_check_config: + port_value: 8000 + - endpoint: + address: + socket_address: + address: 10.0.0.2 + port_value: 443 + health_check_config: + port_value: 8000 + )EOF"; + + NiceMock cm; + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); + cluster.initialize([] {}); + + auto& hosts = cluster.prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + for (int i = 0; i < 2; ++i) { + const auto& locality = hosts[i]->locality(); + EXPECT_EQ("oceania", locality.region()); + EXPECT_EQ("hello", locality.zone()); + EXPECT_EQ("world", locality.sub_zone()); + } + EXPECT_EQ(nullptr, cluster.prioritySet().hostSetsPerPriority()[0]->localityWeights()); + EXPECT_FALSE(cluster.info()->addedViaApi()); +} + TEST(StaticClusterImplTest, AltStatName) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; const std::string yaml = R"EOF( name: staticcluster @@ -458,8 +920,8 @@ TEST(StaticClusterImplTest, AltStatName) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, + local_info, cm, false); cluster.initialize([] {}); // Increment a stat and verify it is emitted with alt_stat_name cluster.info()->stats().upstream_rq_total_.inc(); @@ -470,6 +932,8 @@ TEST(StaticClusterImplTest, RingHash) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "staticcluster", @@ -481,8 +945,8 @@ TEST(StaticClusterImplTest, RingHash) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - true); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, true); cluster.initialize([] {}); EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); @@ -494,6 +958,8 @@ TEST(StaticClusterImplTest, OutlierDetector) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -506,8 +972,8 @@ TEST(StaticClusterImplTest, OutlierDetector) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); Outlier::MockDetector* detector = new Outlier::MockDetector(); EXPECT_CALL(*detector, addChangedStateCb(_)); @@ -541,6 +1007,8 @@ TEST(StaticClusterImplTest, HealthyStat) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -553,8 +1021,8 @@ TEST(StaticClusterImplTest, HealthyStat) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); Outlier::MockDetector* outlier_detector = new NiceMock(); cluster.setOutlierDetector(Outlier::DetectorSharedPtr{outlier_detector}); @@ -623,6 +1091,8 @@ TEST(StaticClusterImplTest, UrlConfig) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -635,8 +1105,8 @@ TEST(StaticClusterImplTest, UrlConfig) { )EOF"; NiceMock cm; - StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, - false); + StaticClusterImpl cluster(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false); cluster.initialize([] {}); EXPECT_EQ(1024U, cluster.info()->resourceManager(ResourcePriority::Default).connections().max()); @@ -656,8 +1126,8 @@ TEST(StaticClusterImplTest, UrlConfig) { std::list({"10.0.0.1:11001", "10.0.0.2:11002"}), ContainerEq(hostListToAddresses(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()))); EXPECT_EQ(2UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); - EXPECT_EQ(0UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); - EXPECT_EQ(0UL, + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ(1UL, cluster.prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0]->healthChecker().setUnhealthy(); } @@ -667,6 +1137,8 @@ TEST(StaticClusterImplTest, UnsupportedLBType) { Ssl::MockContextManager ssl_context_manager; NiceMock runtime; NiceMock cm; + NiceMock local_info; + const std::string json = R"EOF( { "name": "addressportconfig", @@ -678,15 +1150,17 @@ TEST(StaticClusterImplTest, UnsupportedLBType) { } )EOF"; - EXPECT_THROW( - StaticClusterImpl(parseClusterFromJson(json), runtime, stats, ssl_context_manager, cm, false), - EnvoyException); + EXPECT_THROW(StaticClusterImpl(parseClusterFromJson(json), runtime, stats, ssl_context_manager, + local_info, cm, false), + EnvoyException); } TEST(StaticClusterImplTest, MalformedHostIP) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + const std::string yaml = R"EOF( name: name connect_timeout: 0.25s @@ -697,7 +1171,7 @@ TEST(StaticClusterImplTest, MalformedHostIP) { NiceMock cm; EXPECT_THROW_WITH_MESSAGE(StaticClusterImpl(parseClusterFromV2Yaml(yaml), runtime, stats, - ssl_context_manager, cm, false), + ssl_context_manager, local_info, cm, false), EnvoyException, "malformed IP address: foo.bar.com. Consider setting resolver_name or " "setting cluster type to 'STRICT_DNS' or 'LOGICAL_DNS'"); @@ -739,6 +1213,8 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { Stats::IsolatedStoreImpl stats; Ssl::MockContextManager ssl_context_manager; NiceMock runtime; + NiceMock local_info; + envoy::api::v2::Cluster config; config.set_name("staticcluster"); config.mutable_connect_timeout(); @@ -747,7 +1223,7 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { // If the cluster manager gets a source address from the bootstrap proto, use it. NiceMock cm; cm.bind_config_.mutable_source_address()->set_address("1.2.3.5"); - StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, cm, false); + StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, local_info, cm, false); EXPECT_EQ("1.2.3.5:0", cluster.info()->sourceAddress()->asString()); } @@ -756,7 +1232,7 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { { // Verify source address from cluster config is used when present. NiceMock cm; - StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, cm, false); + StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, local_info, cm, false); EXPECT_EQ(cluster_address, cluster.info()->sourceAddress()->ip()->addressAsString()); } @@ -764,7 +1240,7 @@ TEST(StaticClusterImplTest, SourceAddressPriority) { // The source address from cluster config takes precedence over one from the bootstrap proto. NiceMock cm; cm.bind_config_.mutable_source_address()->set_address("1.2.3.5"); - StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, cm, false); + StaticClusterImpl cluster(config, runtime, stats, ssl_context_manager, local_info, cm, false); EXPECT_EQ(cluster_address, cluster.info()->sourceAddress()->ip()->addressAsString()); } } @@ -778,6 +1254,7 @@ TEST(ClusterImplTest, CloseConnectionsOnHostHealthFailure) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; ReadyWatcher initialized; const std::string yaml = R"EOF( @@ -789,7 +1266,7 @@ TEST(ClusterImplTest, CloseConnectionsOnHostHealthFailure) { hosts: [{ socket_address: { address: foo.bar.com, port_value: 443 }}] )EOF"; StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); EXPECT_TRUE(cluster.info()->features() & ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE); } @@ -851,6 +1328,7 @@ TEST(ClusterMetadataTest, Metadata) { NiceMock dispatcher; NiceMock runtime; NiceMock cm; + NiceMock local_info; ReadyWatcher initialized; const std::string yaml = R"EOF( @@ -866,7 +1344,7 @@ TEST(ClusterMetadataTest, Metadata) { )EOF"; StrictDnsClusterImpl cluster(parseClusterFromV2Yaml(yaml), runtime, stats, ssl_context_manager, - dns_resolver, cm, dispatcher, false); + local_info, dns_resolver, cm, dispatcher, false); EXPECT_EQ("test_value", Config::Metadata::metadataValue(cluster.info()->metadata(), "com.bar.foo", "baz") .string_value()); From 18e98f4d501120f77650c0add3e8d343352a7780 Mon Sep 17 00:00:00 2001 From: Dhi Aurrahman Date: Thu, 19 Jul 2018 04:09:56 +0700 Subject: [PATCH 2/4] review: docs update Signed-off-by: Dhi Aurrahman --- docs/root/configuration/overview/v2_overview.rst | 2 +- docs/root/intro/arch_overview/health_checking.rst | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/docs/root/configuration/overview/v2_overview.rst b/docs/root/configuration/overview/v2_overview.rst index 3b9b92226245a..9939941d90df3 100644 --- a/docs/root/configuration/overview/v2_overview.rst +++ b/docs/root/configuration/overview/v2_overview.rst @@ -564,7 +564,7 @@ the shared ADS channel. Management Server Unreachability -------------------------------- -When Envoy instance looses connectivity with the management server, Envoy will latch on to +When Envoy instance loses connectivity with the management server, Envoy will latch on to the previous configuration while actively retrying in the background to reestablish the connection with the management server. diff --git a/docs/root/intro/arch_overview/health_checking.rst b/docs/root/intro/arch_overview/health_checking.rst index 017a5564e8f50..08812c0b62e56 100644 --- a/docs/root/intro/arch_overview/health_checking.rst +++ b/docs/root/intro/arch_overview/health_checking.rst @@ -31,11 +31,10 @@ Per cluster member health check config If active health checking is configured for an upstream cluster, a specific additional configuration for each registered member can be specified by setting the -:ref:`health check config` -in :ref:`endpoint message` -of an :ref:`LbEndpoint` of each defined -:ref:`LocalityLbEndpoints` -in a :ref:`ClusterLoadAssignment`. +:ref:`HealthCheckConfig` +in the :ref:`Endpoint` of an :ref:`LbEndpoint` +of each defined :ref:`LocalityLbEndpoints` in a +:ref:`ClusterLoadAssignment`. An example of setting up :ref:`health check config` to set a :ref:`cluster member`'s alternative health check @@ -49,7 +48,6 @@ to set a :ref:`cluster member`'s alternative he - endpoint: health_check_config: port_value: 8080 - address: socket_address: address: localhost From a7c9ec9cd2a8f22bbcc82d2ded9117e5d1168ef1 Mon Sep 17 00:00:00 2001 From: Dhi Aurrahman Date: Thu, 19 Jul 2018 05:46:31 +0700 Subject: [PATCH 3/4] Trigger re-build Signed-off-by: Dhi Aurrahman From 75a15373164abbef989ce0c7df39e02f1809e0aa Mon Sep 17 00:00:00 2001 From: Dhi Aurrahman Date: Sat, 21 Jul 2018 04:38:11 +0700 Subject: [PATCH 4/4] review comments Signed-off-by: Dhi Aurrahman --- docs/root/configuration/overview/v2_overview.rst | 2 +- source/common/upstream/logical_dns_cluster.cc | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/root/configuration/overview/v2_overview.rst b/docs/root/configuration/overview/v2_overview.rst index 9939941d90df3..3c8e0aff06c99 100644 --- a/docs/root/configuration/overview/v2_overview.rst +++ b/docs/root/configuration/overview/v2_overview.rst @@ -564,7 +564,7 @@ the shared ADS channel. Management Server Unreachability -------------------------------- -When Envoy instance loses connectivity with the management server, Envoy will latch on to +When an Envoy instance loses connectivity with the management server, Envoy will latch on to the previous configuration while actively retrying in the background to reestablish the connection with the management server. diff --git a/source/common/upstream/logical_dns_cluster.cc b/source/common/upstream/logical_dns_cluster.cc index cc2f4d44c7ba3..5b3709041b021 100644 --- a/source/common/upstream/logical_dns_cluster.cc +++ b/source/common/upstream/logical_dns_cluster.cc @@ -35,10 +35,12 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, : Config::Utility::translateClusterHosts(cluster.hosts())) { const auto& locality_lb_endpoints = load_assignment_.endpoints(); if (locality_lb_endpoints.size() != 1 || locality_lb_endpoints[0].lb_endpoints().size() != 1) { - throw EnvoyException(fmt::format("LOGICAL_DNS clusters must have {}", - cluster.has_load_assignment() - ? "a single locality_lb_endpoint and a single lb_endpoint" - : "a single host")); + if (cluster.has_load_assignment()) { + throw EnvoyException( + "LOGICAL_DNS clusters must have a single locality_lb_endpoint and a single lb_endpoint"); + } else { + throw EnvoyException("LOGICAL_DNS clusters must have a single host"); + } } switch (cluster.dns_lookup_family()) {