From 7dced383c1da90f7bdaa3090e13fd349dbbf90d4 Mon Sep 17 00:00:00 2001 From: Lilika Markatou Date: Mon, 6 Aug 2018 17:56:08 -0400 Subject: [PATCH 1/3] Revert "Revert "Basic Implementation of HDS (#3973)" (#4063)" This reverts commit 4e52589533a4a85846a0e0df8be6e75c9ea012c2. Signed-off-by: Lilika Markatou --- include/envoy/upstream/BUILD | 2 + include/envoy/upstream/cluster_manager.h | 27 ++ source/common/upstream/BUILD | 7 + .../upstream/health_discovery_service.cc | 197 +++++++- .../upstream/health_discovery_service.h | 139 +++++- source/server/server.cc | 3 +- source/server/server.h | 1 + test/common/upstream/BUILD | 18 + test/common/upstream/hds_test.cc | 287 ++++++++++++ test/integration/BUILD | 10 +- test/integration/fake_upstream.h | 3 + test/integration/hds_integration_test.cc | 440 ++++++++++++++++-- test/mocks/upstream/BUILD | 1 + test/mocks/upstream/mocks.h | 11 + 14 files changed, 1065 insertions(+), 81 deletions(-) create mode 100644 test/common/upstream/hds_test.cc diff --git a/include/envoy/upstream/BUILD b/include/envoy/upstream/BUILD index 3c5c8312df637..e59ddb8a392fa 100644 --- a/include/envoy/upstream/BUILD +++ b/include/envoy/upstream/BUILD @@ -106,6 +106,8 @@ envoy_cc_library( "//include/envoy/http:codec_interface", "//include/envoy/network:connection_interface", "//include/envoy/network:transport_socket_interface", + "//include/envoy/runtime:runtime_interface", "//include/envoy/ssl:context_interface", + "//include/envoy/ssl:context_manager_interface", ], ) diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 1abdd3361df9e..07c93b67e48a5 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -17,6 +17,7 @@ #include "envoy/runtime/runtime.h" #include "envoy/secret/secret_manager.h" #include "envoy/server/admin.h" +#include "envoy/ssl/context_manager.h" #include "envoy/tcp/conn_pool.h" #include "envoy/upstream/health_checker.h" #include "envoy/upstream/load_balancer.h" @@ -293,5 +294,31 @@ class ClusterManagerFactory { virtual Secret::SecretManager& secretManager() PURE; }; +/** + * Factory for creating ClusterInfo + */ +class ClusterInfoFactory { +public: + virtual ~ClusterInfoFactory() {} + + /** + * This method returns a Upstream::ClusterInfoConstSharedPtr + * + * @param runtime supplies the runtime loader. + * @param cluster supplies the owning cluster. + * @param bind_config supplies information on binding newly established connections. + * @param stats supplies a store for all known counters, gauges, and timers. + * @param ssl_context_manager supplies a manager for all SSL contexts. + * @param secret_manager supplies a manager for static secrets. + * @param added_via_api denotes whether this was added via API. + * @return Upstream::ClusterInfoConstSharedPtr + */ + virtual Upstream::ClusterInfoConstSharedPtr + createClusterInfo(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, + bool added_via_api) PURE; +}; + } // namespace Upstream } // namespace Envoy diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index 545a660a4368c..f13b4b490ba34 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -166,10 +166,17 @@ envoy_cc_library( srcs = ["health_discovery_service.cc"], hdrs = ["health_discovery_service.h"], deps = [ + ":health_checker_lib", + ":upstream_includes", "//include/envoy/event:dispatcher_interface", + "//include/envoy/runtime:runtime_interface", + "//include/envoy/ssl:context_manager_interface", "//include/envoy/stats:stats_macros", + "//include/envoy/upstream:cluster_manager_interface", + "//include/envoy/upstream:upstream_interface", "//source/common/common:minimal_logger_lib", "//source/common/grpc:async_client_lib", + "//source/common/network:resolver_lib", "@envoy_api//envoy/service/discovery/v2:hds_cc", ], ) diff --git a/source/common/upstream/health_discovery_service.cc b/source/common/upstream/health_discovery_service.cc index 10d04488bf955..1a5fcd76d5dd5 100644 --- a/source/common/upstream/health_discovery_service.cc +++ b/source/common/upstream/health_discovery_service.cc @@ -6,19 +6,31 @@ namespace Envoy { namespace Upstream { HdsDelegate::HdsDelegate(const envoy::api::v2::core::Node& node, Stats::Scope& scope, - Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher) + Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, + Runtime::Loader& runtime, Envoy::Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, + Secret::SecretManager& secret_manager, Runtime::RandomGenerator& random, + ClusterInfoFactory& info_factory, + AccessLog::AccessLogManager& access_log_manager) : stats_{ALL_HDS_STATS(POOL_COUNTER_PREFIX(scope, "hds_delegate."))}, - async_client_(std::move(async_client)), service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.HealthDiscoveryService.StreamHealthCheck")) { + "envoy.service.discovery.v2.HealthDiscoveryService.StreamHealthCheck")), + async_client_(std::move(async_client)), dispatcher_(dispatcher), runtime_(runtime), + store_stats(stats), ssl_context_manager_(ssl_context_manager), + secret_manager_(secret_manager), random_(random), info_factory_(info_factory), + access_log_manager_(access_log_manager) { health_check_request_.mutable_node()->MergeFrom(node); - retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); - response_timer_ = dispatcher.createTimer([this]() -> void { sendHealthCheckRequest(); }); + hds_retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); + hds_stream_response_timer_ = dispatcher.createTimer([this]() -> void { sendResponse(); }); establishNewStream(); } -void HdsDelegate::setRetryTimer() { - retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS)); +void HdsDelegate::setHdsRetryTimer() { + hds_retry_timer_->enableTimer(std::chrono::milliseconds(RetryDelayMilliseconds)); +} + +void HdsDelegate::setHdsStreamResponseTimer() { + hds_stream_response_timer_->enableTimer(std::chrono::milliseconds(server_response_ms_)); } void HdsDelegate::establishNewStream() { @@ -30,20 +42,46 @@ void HdsDelegate::establishNewStream() { return; } - sendHealthCheckRequest(); -} - -void HdsDelegate::sendHealthCheckRequest() { - ENVOY_LOG(debug, "Sending HealthCheckRequest"); + // TODO(lilika): Add support for other types of healthchecks + health_check_request_.mutable_capability()->add_health_check_protocol( + envoy::service::discovery::v2::Capability::HTTP); + ENVOY_LOG(debug, "Sending HealthCheckRequest {} ", health_check_request_.DebugString()); stream_->sendMessage(health_check_request_, false); stats_.responses_.inc(); } +// TODO(lilika) : Use jittered backoff as in https://github.com/envoyproxy/envoy/pull/3791 void HdsDelegate::handleFailure() { - ENVOY_LOG(warn, "Load reporter stats stream/connection failure, will retry in {} ms.", - RETRY_DELAY_MS); + ENVOY_LOG(warn, "HdsDelegate stream/connection failure, will retry in {} ms.", + RetryDelayMilliseconds); stats_.errors_.inc(); - setRetryTimer(); + setHdsRetryTimer(); +} + +// TODO(lilika): Add support for the same endpoint in different clusters/ports +envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse +HdsDelegate::sendResponse() { + envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse response; + for (const auto& cluster : hds_clusters_) { + for (const auto& hosts : cluster->prioritySet().hostSetsPerPriority()) { + for (const auto& host : hosts->hosts()) { + auto* endpoint = response.mutable_endpoint_health_response()->add_endpoints_health(); + Network::Utility::addressToProtobufAddress( + *host->address(), *endpoint->mutable_endpoint()->mutable_address()); + // TODO(lilika): Add support for more granular options of envoy::api::v2::core::HealthStatus + if (host->healthy()) { + endpoint->set_health_status(envoy::api::v2::core::HealthStatus::HEALTHY); + } else { + endpoint->set_health_status(envoy::api::v2::core::HealthStatus::UNHEALTHY); + } + } + } + } + ENVOY_LOG(debug, "Sending EndpointHealthResponse to server {}", response.DebugString()); + stream_->sendMessage(response, false); + stats_.responses_.inc(); + setHdsStreamResponseTimer(); + return response; } void HdsDelegate::onCreateInitialMetadata(Http::HeaderMap& metadata) { @@ -54,12 +92,59 @@ void HdsDelegate::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) { UNREFERENCED_PARAMETER(metadata); } +void HdsDelegate::processMessage( + std::unique_ptr&& message) { + ENVOY_LOG(debug, "New health check response message {} ", message->DebugString()); + ASSERT(message); + + for (const auto& cluster_health_check : message->health_check()) { + // Create HdsCluster config + static const envoy::api::v2::core::BindConfig bind_config; + envoy::api::v2::Cluster cluster_config; + + cluster_config.set_name(cluster_health_check.cluster_name()); + cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds); + cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value( + ClusterConnectionBufferLimitBytes); + + // Add endpoints to cluster + for (const auto& locality_endpoints : cluster_health_check.endpoints()) { + for (const auto& endpoint : locality_endpoints.endpoints()) { + cluster_config.add_hosts()->MergeFrom(endpoint.address()); + } + } + + // TODO(lilika): Add support for optional per-endpoint health checks + + // Add healthchecks to cluster + for (auto& health_check : cluster_health_check.health_checks()) { + cluster_config.add_health_checks()->MergeFrom(health_check); + } + + ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString()); + + // Create HdsCluster + hds_clusters_.emplace_back(new HdsCluster(runtime_, cluster_config, bind_config, store_stats, + ssl_context_manager_, secret_manager_, false, + info_factory_)); + + hds_clusters_.back()->startHealthchecks(access_log_manager_, runtime_, random_, dispatcher_); + } +} + +// TODO(lilika): Add support for subsequent HealthCheckSpecifier messages that +// might modify the HdsClusters void HdsDelegate::onReceiveMessage( std::unique_ptr&& message) { - ENVOY_LOG(debug, "New health check response ", message->DebugString()); stats_.requests_.inc(); - stream_->sendMessage(health_check_request_, false); - stats_.responses_.inc(); + ENVOY_LOG(debug, "New health check response message {} ", message->DebugString()); + + // Process the HealthCheckSpecifier message + processMessage(std::move(message)); + + // Set response + server_response_ms_ = PROTOBUF_GET_MS_REQUIRED(*message, interval); + setHdsStreamResponseTimer(); } void HdsDelegate::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) { @@ -68,10 +153,84 @@ void HdsDelegate::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) { void HdsDelegate::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { ENVOY_LOG(warn, "gRPC config stream closed: {}, {}", status, message); - response_timer_->disableTimer(); + hds_stream_response_timer_->disableTimer(); stream_ = nullptr; handleFailure(); } +HdsCluster::HdsCluster(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, + Secret::SecretManager& secret_manager, bool added_via_api, + ClusterInfoFactory& info_factory) + : runtime_(runtime), cluster_(cluster), bind_config_(bind_config), stats_(stats), + ssl_context_manager_(ssl_context_manager), secret_manager_(secret_manager), + added_via_api_(added_via_api), initial_hosts_(new HostVector()) { + ENVOY_LOG(debug, "Creating an HdsCluster"); + priority_set_.getOrCreateHostSet(0); + + info_ = info_factory.createClusterInfo(runtime_, cluster_, bind_config_, stats_, + ssl_context_manager_, secret_manager_, added_via_api_); + + for (const auto& host : cluster.hosts()) { + initial_hosts_->emplace_back( + new HostImpl(info_, "", Network::Address::resolveProtoAddress(host), + envoy::api::v2::core::Metadata::default_instance(), 1, + envoy::api::v2::core::Locality().default_instance(), + envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance())); + } + initialize([] {}); +} + +ClusterSharedPtr HdsCluster::create() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + +HostVectorConstSharedPtr HdsCluster::createHealthyHostList(const HostVector& hosts) { + HostVectorSharedPtr healthy_list(new HostVector()); + for (const auto& host : hosts) { + if (host->healthy()) { + healthy_list->emplace_back(host); + } + } + return healthy_list; +} + +ClusterInfoConstSharedPtr ProdClusterInfoFactory::createClusterInfo( + Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, + bool added_via_api) { + + return std::make_unique(cluster, bind_config, runtime, stats, + ssl_context_manager, secret_manager, added_via_api); +} + +void HdsCluster::startHealthchecks(AccessLog::AccessLogManager& access_log_manager, + Runtime::Loader& runtime, Runtime::RandomGenerator& random, + Event::Dispatcher& dispatcher) { + + for (auto& health_check : cluster_.health_checks()) { + health_checkers_.push_back(Upstream::HealthCheckerFactory::create( + health_check, *this, runtime, random, dispatcher, access_log_manager)); + health_checkers_.back()->start(); + } +} + +void HdsCluster::initialize(std::function callback) { + initialization_complete_callback_ = callback; + for (const auto& host : *initial_hosts_) { + host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); + } + + auto& first_host_set = priority_set_.getOrCreateHostSet(0); + auto healthy = createHealthyHostList(*initial_hosts_); + + first_host_set.updateHosts(initial_hosts_, healthy, HostsPerLocalityImpl::empty(), + HostsPerLocalityImpl::empty(), {}, *initial_hosts_, {}); +} + +void HdsCluster::setOutlierDetector(const Outlier::DetectorSharedPtr&) { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +} + } // namespace Upstream } // namespace Envoy diff --git a/source/common/upstream/health_discovery_service.h b/source/common/upstream/health_discovery_service.h index 980d8a450bae9..f167a20324fb8 100644 --- a/source/common/upstream/health_discovery_service.h +++ b/source/common/upstream/health_discovery_service.h @@ -2,16 +2,88 @@ #include "envoy/event/dispatcher.h" #include "envoy/service/discovery/v2/hds.pb.h" +#include "envoy/ssl/context_manager.h" #include "envoy/stats/stats_macros.h" +#include "envoy/upstream/upstream.h" #include "common/common/logger.h" #include "common/grpc/async_client_impl.h" +#include "common/network/resolver_impl.h" +#include "common/upstream/health_checker_impl.h" +#include "common/upstream/upstream_impl.h" namespace Envoy { namespace Upstream { +class ProdClusterInfoFactory : public ClusterInfoFactory, Logger::Loggable { +public: + ClusterInfoConstSharedPtr + createClusterInfo(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, + bool added_via_api) override; +}; + +// TODO(lilika): Add HdsClusters to the /clusters endpoint to get detailed stats about each HC host. + +/** + * Implementation of Upstream::Cluster for hds clusters, clusters that are used + * by HdsDelegates + */ + +class HdsCluster : public Cluster, Logger::Loggable { +public: + static ClusterSharedPtr create(); + HdsCluster(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, + bool added_via_api, ClusterInfoFactory& info_factory); + + // From Upstream::Cluster + InitializePhase initializePhase() const override { return InitializePhase::Primary; } + PrioritySet& prioritySet() override { return priority_set_; } + const PrioritySet& prioritySet() const override { return priority_set_; } + void setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector); + HealthChecker* healthChecker() override { return health_checker_.get(); } + ClusterInfoConstSharedPtr info() const override { return info_; } + Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); } + const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); } + void initialize(std::function callback) override; + + // Creates and starts healthcheckers to its endpoints + void startHealthchecks(AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime, + Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher); + + std::vector healthCheckers() { return health_checkers_; }; + +protected: + PrioritySetImpl priority_set_; + HealthCheckerSharedPtr health_checker_; + Outlier::DetectorSharedPtr outlier_detector_; + + // Creates a vector containing any healthy hosts + static HostVectorConstSharedPtr createHealthyHostList(const HostVector& hosts); + +private: + std::function initialization_complete_callback_; + + Runtime::Loader& runtime_; + const envoy::api::v2::Cluster& cluster_; + const envoy::api::v2::core::BindConfig& bind_config_; + Stats::Store& stats_; + Ssl::ContextManager& ssl_context_manager_; + Secret::SecretManager& secret_manager_; + bool added_via_api_; + + HostVectorSharedPtr initial_hosts_; + ClusterInfoConstSharedPtr info_; + std::vector health_checkers_; +}; + +typedef std::shared_ptr HdsClusterPtr; + /** - * All load reporter stats. @see stats_macros.h + * All hds stats. @see stats_macros.h */ // clang-format off #define ALL_HDS_STATS(COUNTER) \ @@ -21,18 +93,29 @@ namespace Upstream { // clang-format on /** - * Struct definition for all load reporter stats. @see stats_macros.h + * Struct definition for all hds stats. @see stats_macros.h */ struct HdsDelegateStats { ALL_HDS_STATS(GENERATE_COUNTER_STRUCT) }; +// TODO(lilika): Add /config_dump support for HdsDelegate + +/** + * The HdsDelegate class is responsible for receiving requests from a management + * server with a set of hosts to healthcheck, healthchecking them, and reporting + * back the results. + */ class HdsDelegate : Grpc::TypedAsyncStreamCallbacks, Logger::Loggable { public: HdsDelegate(const envoy::api::v2::core::Node& node, Stats::Scope& scope, - Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher); + Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, + Runtime::Loader& runtime, Envoy::Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, + Runtime::RandomGenerator& random, ClusterInfoFactory& info_factory, + AccessLog::AccessLogManager& access_log_manager); // Grpc::TypedAsyncStreamCallbacks void onCreateInitialMetadata(Http::HeaderMap& metadata) override; @@ -41,25 +124,59 @@ class HdsDelegate std::unique_ptr&& message) override; void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) override; void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; + envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse sendResponse(); - // TODO(htuch): Make this configurable or some static. - const uint32_t RETRY_DELAY_MS = 5000; + std::vector hdsClusters() { return hds_clusters_; }; private: - void setRetryTimer(); - void establishNewStream(); - void sendHealthCheckRequest(); + friend class HdsDelegateFriend; + + void setHdsRetryTimer(); + void setHdsStreamResponseTimer(); void handleFailure(); + // Establishes a connection with the management server + void establishNewStream(); + void + processMessage(std::unique_ptr&& message); HdsDelegateStats stats_; + const Protobuf::MethodDescriptor& service_method_; + Grpc::AsyncClientPtr async_client_; Grpc::AsyncStream* stream_{}; - const Protobuf::MethodDescriptor& service_method_; - Event::TimerPtr retry_timer_; - Event::TimerPtr response_timer_; + Event::Dispatcher& dispatcher_; + Runtime::Loader& runtime_; + Envoy::Stats::Store& store_stats; + Ssl::ContextManager& ssl_context_manager_; + Secret::SecretManager& secret_manager_; + Runtime::RandomGenerator& random_; + ClusterInfoFactory& info_factory_; + AccessLog::AccessLogManager& access_log_manager_; + envoy::service::discovery::v2::HealthCheckRequest health_check_request_; std::unique_ptr health_check_message_; + std::vector clusters_; + std::vector hds_clusters_; + + Event::TimerPtr hds_stream_response_timer_; + Event::TimerPtr hds_retry_timer_; + + // TODO(lilika): Add API knob for RetryDelayMilliseconds, instead of + // hardcoding it. + // How often we retry to establish a stream to the management server + const uint32_t RetryDelayMilliseconds = 5000; + + // Soft limit on size of the cluster’s connections read and write buffers. + static constexpr uint32_t ClusterConnectionBufferLimitBytes = 32768; + + // TODO(lilika): Add API knob for ClusterTimeoutSeconds, instead of + // hardcoding it. + // The timeout for new network connections to hosts in the cluster. + static constexpr uint32_t ClusterTimeoutSeconds = 1; + + // How often envoy reports the healthcheck results to the server + uint32_t server_response_ms_ = 0; }; typedef std::unique_ptr HdsDelegatePtr; diff --git a/source/server/server.cc b/source/server/server.cc index cc53893403c71..896359fc7ea66 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -289,7 +289,8 @@ void InstanceImpl::initialize(Options& options, bootstrap_.node(), stats(), Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, hds_config, stats()) ->create(), - dispatcher())); + dispatcher(), runtime(), stats(), sslContextManager(), secretManager(), random(), + info_factory_, access_log_manager_)); } for (Stats::SinkPtr& sink : main_config->statsSinks()) { diff --git a/source/server/server.h b/source/server/server.h index 58ca46beb5d29..e3f05572d1bb0 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -221,6 +221,7 @@ class InstanceImpl : Logger::Loggable, public Instance { ConfigTracker::EntryOwnerPtr config_tracker_entry_; SystemTime bootstrap_config_update_time_; Grpc::AsyncClientManagerPtr async_client_manager_; + Upstream::ProdClusterInfoFactory info_factory_; Upstream::HdsDelegatePtr hds_delegate_; }; diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index 789dd66c28a1a..357c286b10606 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -162,6 +162,24 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "hds_test", + srcs = ["hds_test.cc"], + deps = [ + "//source/common/ssl:context_lib", + "//source/common/stats:stats_lib", + "//source/common/upstream:health_discovery_service_lib", + "//test/mocks/access_log:access_log_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/network:network_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/test_common:utility_lib", + "@envoy_api//envoy/api/v2/endpoint:load_report_cc", + "@envoy_api//envoy/service/discovery/v2:hds_cc", + ], +) + envoy_cc_test( name = "logical_dns_cluster_test", srcs = ["logical_dns_cluster_test.cc"], diff --git a/test/common/upstream/hds_test.cc b/test/common/upstream/hds_test.cc new file mode 100644 index 0000000000000..b175e525433e3 --- /dev/null +++ b/test/common/upstream/hds_test.cc @@ -0,0 +1,287 @@ +#include "envoy/service/discovery/v2/hds.pb.h" + +#include "common/ssl/context_manager_impl.h" +#include "common/stats/stats_impl.h" +#include "common/upstream/health_discovery_service.h" + +#include "test/mocks/access_log/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/network/mocks.h" +#include "test/mocks/upstream/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::InSequence; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::_; + +using ::testing::AtLeast; + +namespace Envoy { +namespace Upstream { + +// Friend class of HdsDelegate, making it easier to access private fields +class HdsDelegateFriend { +public: + // Allows access to private function processMessage + void processPrivateMessage( + HdsDelegate& hd, + std::unique_ptr&& message) { + hd.processMessage(std::move(message)); + }; +}; + +class HdsTest : public testing::Test { +public: + HdsTest() + : retry_timer_(new Event::MockTimer()), server_response_timer_(new Event::MockTimer()), + async_client_(new Grpc::MockAsyncClient()) { + node_.set_id("foo"); + } + + // Creates an HdsDelegate + void createHdsDelegate() { + InSequence s; + EXPECT_CALL(dispatcher_, createTimer_(_)).WillOnce(Invoke([this](Event::TimerCb timer_cb) { + retry_timer_cb_ = timer_cb; + return retry_timer_; + })); + EXPECT_CALL(dispatcher_, createTimer_(_)) + .Times(AtLeast(1)) + .WillOnce(Invoke([this](Event::TimerCb timer_cb) { + server_response_timer_cb_ = timer_cb; + return server_response_timer_; + })); + hds_delegate_.reset(new HdsDelegate(node_, stats_store_, Grpc::AsyncClientPtr(async_client_), + dispatcher_, runtime_, stats_store_, ssl_context_manager_, + secret_manager_, random_, test_factory_, log_manager_)); + } + + // Creates a HealthCheckSpecifier message that contains one endpoint and one + // healthcheck + envoy::service::discovery::v2::HealthCheckSpecifier* createSimpleMessage() { + envoy::service::discovery::v2::HealthCheckSpecifier* msg = + new envoy::service::discovery::v2::HealthCheckSpecifier; + msg->mutable_interval()->set_seconds(1); + + auto* health_check = msg->add_health_check(); + health_check->set_cluster_name("anna"); + health_check->add_health_checks()->mutable_timeout()->set_seconds(1); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); + health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); + health_check->mutable_health_checks(0)->mutable_grpc_health_check(); + health_check->mutable_health_checks(0)->mutable_http_health_check()->set_use_http2(false); + health_check->mutable_health_checks(0)->mutable_http_health_check()->set_path("/healthcheck"); + + auto* socket_address = + health_check->add_endpoints()->add_endpoints()->mutable_address()->mutable_socket_address(); + socket_address->set_address("127.0.0.0"); + socket_address->set_port_value(1234); + + return msg; + } + + envoy::api::v2::core::Node node_; + Event::MockDispatcher dispatcher_; + Stats::IsolatedStoreImpl stats_store_; + MockClusterInfoFactory test_factory_; + + std::unique_ptr hds_delegate_; + HdsDelegateFriend hds_delegate_friend_; + + Event::MockTimer* retry_timer_; + Event::TimerCb retry_timer_cb_; + Event::MockTimer* server_response_timer_; + Event::TimerCb server_response_timer_cb_; + + std::shared_ptr cluster_info_{ + new NiceMock()}; + std::unique_ptr message; + Grpc::MockAsyncStream async_stream_; + Grpc::MockAsyncClient* async_client_; + Runtime::MockLoader runtime_; + Ssl::ContextManagerImpl ssl_context_manager_{runtime_}; + Secret::MockSecretManager secret_manager_; + NiceMock random_; + NiceMock log_manager_; +}; + +// Test if processMessage processes endpoints from a HealthCheckSpecifier +// message correctly +TEST_F(HdsTest, TestProcessMessageEndpoints) { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)); + createHdsDelegate(); + + // Create Message + // - Cluster "anna0" with 3 endpoints + // - Cluster "anna1" with 3 endpoints + message.reset(new envoy::service::discovery::v2::HealthCheckSpecifier); + message->mutable_interval()->set_seconds(1); + + for (int i = 0; i < 2; i++) { + auto* health_check = message->add_health_check(); + health_check->set_cluster_name("anna" + std::to_string(i)); + for (int j = 0; j < 3; j++) { + auto* address = health_check->add_endpoints()->add_endpoints()->mutable_address(); + address->mutable_socket_address()->set_address("127.0.0." + std::to_string(i)); + address->mutable_socket_address()->set_port_value(1234 + j); + } + } + + // Process message + EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _)).Times(2); + hds_delegate_friend_.processPrivateMessage(*hds_delegate_, std::move(message)); + + // Check Correctness + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 3; j++) { + auto& host = + hds_delegate_->hdsClusters()[i]->prioritySet().hostSetsPerPriority()[0]->hosts()[j]; + EXPECT_EQ(host->address()->ip()->addressAsString(), "127.0.0." + std::to_string(i)); + EXPECT_EQ(host->address()->ip()->port(), 1234 + j); + } + } +} + +// Test if processMessage processes health checks from a HealthCheckSpecifier +// message correctly +TEST_F(HdsTest, TestProcessMessageHealthChecks) { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)); + createHdsDelegate(); + + // Create Message + // - Cluster "minkowski0" with 2 health_checks + // - Cluster "minkowski1" with 3 health_checks + message.reset(new envoy::service::discovery::v2::HealthCheckSpecifier); + message->mutable_interval()->set_seconds(1); + + for (int i = 0; i < 2; i++) { + auto* health_check = message->add_health_check(); + health_check->set_cluster_name("minkowski" + std::to_string(i)); + for (int j = 0; j < i + 2; j++) { + auto hc = health_check->add_health_checks(); + hc->mutable_timeout()->set_seconds(i); + hc->mutable_interval()->set_seconds(j); + hc->mutable_unhealthy_threshold()->set_value(j + 1); + hc->mutable_healthy_threshold()->set_value(j + 1); + hc->mutable_grpc_health_check(); + hc->mutable_http_health_check()->set_use_http2(false); + hc->mutable_http_health_check()->set_path("/healthcheck"); + } + } + + // Process message + EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _)) + .WillRepeatedly(Return(cluster_info_)); + + hds_delegate_friend_.processPrivateMessage(*hds_delegate_, std::move(message)); + + // Check Correctness + EXPECT_EQ(hds_delegate_->hdsClusters()[0]->healthCheckers().size(), 2); + EXPECT_EQ(hds_delegate_->hdsClusters()[1]->healthCheckers().size(), 3); +} + +// Tests OnReceiveMessage given a minimal HealthCheckSpecifier message +TEST_F(HdsTest, TestMinimalOnReceiveMessage) { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)); + createHdsDelegate(); + + // Create Message + message.reset(new envoy::service::discovery::v2::HealthCheckSpecifier); + message->mutable_interval()->set_seconds(1); + + // Process message + EXPECT_CALL(*server_response_timer_, enableTimer(_)).Times(AtLeast(1)); + hds_delegate_->onReceiveMessage(std::move(message)); +} + +// Tests that SendResponse responds to the server in a timely fashion +// given a minimal HealthCheckSpecifier message +TEST_F(HdsTest, TestMinimalSendResponse) { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)); + createHdsDelegate(); + + // Create Message + message.reset(new envoy::service::discovery::v2::HealthCheckSpecifier); + message->mutable_interval()->set_seconds(1); + + // Process message and send 2 responses + EXPECT_CALL(*server_response_timer_, enableTimer(_)).Times(AtLeast(1)); + EXPECT_CALL(async_stream_, sendMessage(_, _)).Times(2); + hds_delegate_->onReceiveMessage(std::move(message)); + hds_delegate_->sendResponse(); + server_response_timer_cb_(); +} + +TEST_F(HdsTest, TestStreamConnectionFailure) { + EXPECT_CALL(*async_client_, start(_, _)) + .WillOnce(Return(nullptr)) + .WillOnce(Return(&async_stream_)); + EXPECT_CALL(*retry_timer_, enableTimer(_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)); + + // Test connection failure and retry + createHdsDelegate(); + retry_timer_cb_(); +} + +// TODO(lilika): Add unit tests for HdsDelegate::sendResponse() with healthy and +// unhealthy endpoints. + +// Tests that SendResponse responds to the server correctly given +// a HealthCheckSpecifier message that contains a single endpoint +// which times out +TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)); + createHdsDelegate(); + + // Create Message + message.reset(createSimpleMessage()); + + Network::MockClientConnection* connection_ = new NiceMock(); + EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillRepeatedly(Return(connection_)); + EXPECT_CALL(*server_response_timer_, enableTimer(_)).Times(2); + EXPECT_CALL(async_stream_, sendMessage(_, false)); + EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _)) + .WillOnce(Return(cluster_info_)); + EXPECT_CALL(*connection_, setBufferLimits(_)); + EXPECT_CALL(dispatcher_, deferredDelete_(_)); + // Process message + hds_delegate_->onReceiveMessage(std::move(message)); + connection_->raiseEvent(Network::ConnectionEvent::Connected); + + // Send Response + auto msg = hds_delegate_->sendResponse(); + + // Correctness + EXPECT_EQ(msg.endpoint_health_response().endpoints_health(0).health_status(), + envoy::api::v2::core::HealthStatus::UNHEALTHY); + EXPECT_EQ(msg.endpoint_health_response() + .endpoints_health(0) + .endpoint() + .address() + .socket_address() + .address(), + "127.0.0.0"); + EXPECT_EQ(msg.endpoint_health_response() + .endpoints_health(0) + .endpoint() + .address() + .socket_address() + .port_value(), + 1234); +} + +} // namespace Upstream +} // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index fbcdf003a5c12..5165ef59314d6 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -379,10 +379,18 @@ envoy_cc_test( srcs = ["hds_integration_test.cc"], deps = [ ":http_integration_lib", + ":integration_lib", + "//include/envoy/upstream:upstream_interface", + "//source/common/config:metadata_lib", "//source/common/config:resources_lib", + "//source/common/json:config_schemas_lib", + "//source/common/json:json_loader_lib", + "//source/common/network:utility_lib", + "//source/common/upstream:health_checker_lib", + "//source/common/upstream:health_discovery_service_lib", + "//test/common/upstream:utility_lib", "//test/config:utility_lib", "//test/test_common:network_utility_lib", - "//test/test_common:utility_lib", "@envoy_api//envoy/api/v2:eds_cc", "@envoy_api//envoy/service/discovery/v2:hds_cc", ], diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index a81067790c473..aa36fac0edbec 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -585,4 +585,7 @@ class FakeUpstream : Logger::Loggable, FakeListener listener_; const Network::FilterChainSharedPtr filter_chain_; }; + +typedef std::unique_ptr FakeUpstreamPtr; + } // namespace Envoy diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index ae0cd1d189248..af3053cce249e 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -1,14 +1,21 @@ #include "envoy/api/v2/eds.pb.h" #include "envoy/api/v2/endpoint/endpoint.pb.h" #include "envoy/service/discovery/v2/hds.pb.h" +#include "envoy/upstream/upstream.h" +#include "common/config/metadata.h" #include "common/config/resources.h" +#include "common/network/utility.h" +#include "common/protobuf/utility.h" +#include "common/upstream/health_checker_impl.h" +#include "common/upstream/health_discovery_service.h" +#include "test/common/upstream/utility.h" #include "test/config/utility.h" #include "test/integration/http_integration.h" #include "test/test_common/network_utility.h" -#include "test/test_common/utility.h" +#include "gmock/gmock.h" #include "gtest/gtest.h" namespace Envoy { @@ -27,100 +34,435 @@ class HdsIntegrationTest : public HttpIntegrationTest, void initialize() override { setUpstreamCount(upstream_endpoints_); - config_helper_.addConfigModifier([this](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { // Setup hds and corresponding gRPC cluster. - auto* hds_confid = bootstrap.mutable_hds_config(); - hds_confid->set_api_type(envoy::api::v2::core::ApiConfigSource::GRPC); - hds_confid->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("hds_delegate"); + auto* hds_config = bootstrap.mutable_hds_config(); + hds_config->set_api_type(envoy::api::v2::core::ApiConfigSource::GRPC); + hds_config->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("hds_cluster"); auto* hds_cluster = bootstrap.mutable_static_resources()->add_clusters(); hds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); hds_cluster->mutable_circuit_breakers()->Clear(); - hds_cluster->set_name("hds_delegate"); + hds_cluster->set_name("hds_cluster"); hds_cluster->mutable_http2_protocol_options(); - // Switch predefined cluster_0 to EDS filesystem sourcing. - // TODO(lilika): Remove eds dependency auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); cluster_0->mutable_hosts()->Clear(); - cluster_0->set_type(envoy::api::v2::Cluster::EDS); - auto* eds_cluster_config = cluster_0->mutable_eds_cluster_config(); - eds_cluster_config->mutable_eds_config()->set_path(eds_helper_.eds_path()); }); + HttpIntegrationTest::initialize(); - hds_upstream_ = fake_upstreams_[0].get(); - for (uint32_t i = 0; i < upstream_endpoints_; ++i) { - service_upstream_[i] = fake_upstreams_[i + 1].get(); - } + + // Endpoint connections + host_upstream_.reset(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); + host2_upstream_.reset(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); } + // Sets up a connection between Envoy and the management server. void waitForHdsStream() { AssertionResult result = - hds_upstream_->waitForHttpConnection(*dispatcher_, fake_hds_connection_); + hds_upstream_->waitForHttpConnection(*dispatcher_, hds_fake_connection_); RELEASE_ASSERT(result, result.message()); - result = fake_hds_connection_->waitForNewStream(*dispatcher_, hds_stream_); + result = hds_fake_connection_->waitForNewStream(*dispatcher_, hds_stream_); RELEASE_ASSERT(result, result.message()); } - void requestHealthCheckSpecifier() { - envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier; - server_health_check_specifier.mutable_interval()->set_nanos(500000000); // 500ms + // Envoy sends healthcheck messages to the endpoints + void healthcheckEndpoints(std::string cluster2 = "") { + ASSERT_TRUE(host_upstream_->waitForHttpConnection(*dispatcher_, host_fake_connection_)); + ASSERT_TRUE(host_fake_connection_->waitForNewStream(*dispatcher_, host_stream_)); + ASSERT_TRUE(host_stream_->waitForEndStream(*dispatcher_)); + + EXPECT_STREQ(host_stream_->headers().Path()->value().c_str(), "/healthcheck"); + EXPECT_STREQ(host_stream_->headers().Method()->value().c_str(), "GET"); + EXPECT_STREQ(host_stream_->headers().Host()->value().c_str(), "anna"); - hds_stream_->sendGrpcMessage(server_health_check_specifier); - // Wait until the request has been received by Envoy. - test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + if (cluster2 != "") { + ASSERT_TRUE(host2_upstream_->waitForHttpConnection(*dispatcher_, host2_fake_connection_)); + ASSERT_TRUE(host2_fake_connection_->waitForNewStream(*dispatcher_, host2_stream_)); + ASSERT_TRUE(host2_stream_->waitForEndStream(*dispatcher_)); + + EXPECT_STREQ(host2_stream_->headers().Path()->value().c_str(), "/healthcheck"); + EXPECT_STREQ(host2_stream_->headers().Method()->value().c_str(), "GET"); + EXPECT_STREQ(host2_stream_->headers().Host()->value().c_str(), cluster2.c_str()); + } } + // Clean up the connection between Envoy and the management server void cleanupHdsConnection() { - if (fake_hds_connection_ != nullptr) { - AssertionResult result = fake_hds_connection_->close(); + if (hds_fake_connection_ != nullptr) { + AssertionResult result = hds_fake_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = hds_fake_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + } + } + + // Clean up connections between Envoy and endpoints + void cleanupHostConnections() { + if (host_fake_connection_ != nullptr) { + AssertionResult result = host_fake_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = host_fake_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + } + if (host2_fake_connection_ != nullptr) { + AssertionResult result = host2_fake_connection_->close(); RELEASE_ASSERT(result, result.message()); - result = fake_hds_connection_->waitForDisconnect(); + result = host2_fake_connection_->waitForDisconnect(); RELEASE_ASSERT(result, result.message()); } } - static constexpr uint32_t upstream_endpoints_ = 5; + // Creates a basic HealthCheckSpecifier message containing one endpoint and + // one health_check + envoy::service::discovery::v2::HealthCheckSpecifier makeHealthCheckSpecifier() { + envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier_; + server_health_check_specifier_.mutable_interval()->set_seconds(1); + + auto* health_check = server_health_check_specifier_.add_health_check(); + + health_check->set_cluster_name("anna"); + health_check->add_endpoints() + ->add_endpoints() + ->mutable_address() + ->mutable_socket_address() + ->set_address(host_upstream_->localAddress()->ip()->addressAsString()); + health_check->mutable_endpoints(0) + ->mutable_endpoints(0) + ->mutable_address() + ->mutable_socket_address() + ->set_port_value(host_upstream_->localAddress()->ip()->port()); + health_check->mutable_endpoints(0)->mutable_locality()->set_region("some_region"); + health_check->mutable_endpoints(0)->mutable_locality()->set_zone("some_zone"); + health_check->mutable_endpoints(0)->mutable_locality()->set_sub_zone("crete"); + + health_check->add_health_checks()->mutable_timeout()->set_seconds(1); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); + health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); + health_check->mutable_health_checks(0)->mutable_grpc_health_check(); + health_check->mutable_health_checks(0)->mutable_http_health_check()->set_use_http2(false); + health_check->mutable_health_checks(0)->mutable_http_health_check()->set_path("/healthcheck"); + + return server_health_check_specifier_; + } + + // Checks if Envoy reported the health status of an endpoint correctly + void checkEndpointHealthResponse(envoy::service::discovery::v2::EndpointHealth endpoint, + envoy::api::v2::core::HealthStatus healthy, + Network::Address::InstanceConstSharedPtr address) { - IntegrationStreamDecoderPtr response_; - std::string sub_zone_{"winter"}; - FakeHttpConnectionPtr fake_hds_connection_; + EXPECT_EQ(healthy, endpoint.health_status()); + EXPECT_EQ(address->ip()->port(), endpoint.endpoint().address().socket_address().port_value()); + EXPECT_EQ(address->ip()->addressAsString(), + endpoint.endpoint().address().socket_address().address()); + } + + // Checks if the cluster counters are correct + void checkCounters(int requests, int response_s, int successes, int failures) { + EXPECT_EQ(requests, test_server_->counter("hds_delegate.requests")->value()); + EXPECT_EQ(response_s, test_server_->counter("hds_delegate.responses")->value()); + EXPECT_EQ(successes, test_server_->counter("cluster.anna.health_check.success")->value()); + EXPECT_EQ(failures, test_server_->counter("cluster.anna.health_check.failure")->value()); + } + + static constexpr uint32_t upstream_endpoints_ = 0; + + FakeHttpConnectionPtr hds_fake_connection_; FakeStreamPtr hds_stream_; FakeUpstream* hds_upstream_{}; - FakeUpstream* service_upstream_[upstream_endpoints_]{}; uint32_t hds_requests_{}; - EdsHelper eds_helper_; + FakeUpstreamPtr host_upstream_{}; + FakeUpstreamPtr host2_upstream_{}; + FakeStreamPtr host_stream_; + FakeStreamPtr host2_stream_; + FakeHttpConnectionPtr host_fake_connection_; + FakeHttpConnectionPtr host2_fake_connection_; + + envoy::service::discovery::v2::HealthCheckRequest envoy_msg_; + envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse response_; + envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier_; }; INSTANTIATE_TEST_CASE_P(IpVersions, HdsIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); -// Test connectivity of Envoy and the Server -TEST_P(HdsIntegrationTest, Simple) { +// Tests Envoy healthchecking a single healthy endpoint and reporting that it is +// indeed healthy to the server. +TEST_P(HdsIntegrationTest, SingleEndpointHealthy) { initialize(); - envoy::service::discovery::v2::HealthCheckRequest envoy_msg; - envoy::service::discovery::v2::HealthCheckRequest envoy_msg_2; - envoy::service::discovery::v2::HealthCheckSpecifier server_health_check_specifier; - server_health_check_specifier.mutable_interval()->set_nanos(500000000); // 500ms // Server <--> Envoy - ASSERT_TRUE(hds_upstream_->waitForHttpConnection(*dispatcher_, fake_hds_connection_)); - ASSERT_TRUE(fake_hds_connection_->waitForNewStream(*dispatcher_, hds_stream_)); - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg)); + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); - EXPECT_EQ(0, test_server_->counter("hds_delegate.requests")->value()); - EXPECT_EQ(1, test_server_->counter("hds_delegate.responses")->value()); + // Server asks for healthchecking + server_health_check_specifier_ = makeHealthCheckSpecifier(); + hds_stream_->startGrpcStream(); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + // Envoy sends a healthcheck message to an endpoint + healthcheckEndpoints(); + + // Endpoint responds to the healthcheck + host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + host_stream_->encodeData(1024, true); + + // Envoy reports back to server + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + + // Check that the response_ is correct + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::HEALTHY, + host_upstream_->localAddress()); + checkCounters(1, 2, 1, 0); + + // Clean up connections + cleanupHostConnections(); + cleanupHdsConnection(); +} + +// Tests Envoy healthchecking a single endpoint that times out and reporting +// that it is unhealthy to the server. +TEST_P(HdsIntegrationTest, SingleEndpointTimeout) { + initialize(); + server_health_check_specifier_ = makeHealthCheckSpecifier(); + + // Server <--> Envoy + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); - // Send a message to Envoy, and wait until it's received + // Server asks for healthchecking hds_stream_->startGrpcStream(); - hds_stream_->sendGrpcMessage(server_health_check_specifier); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); - // Wait for Envoy to reply - ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_2)); + // Envoy sends a healthcheck message to an endpoint + healthcheckEndpoints(); + + // Endpoint doesn't repond to the healthcheck + + // Envoy reports back to server + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + + // Check that the response_ is correct + // TODO(lilika): Ideally this would be envoy::api::v2::core::HealthStatus::TIMEOUT + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()); + checkCounters(1, 2, 0, 1); + + // Clean up connections + cleanupHostConnections(); + cleanupHdsConnection(); +} + +// Tests Envoy healthchecking a single unhealthy endpoint and reporting that it is +// indeed unhealthy to the server. +TEST_P(HdsIntegrationTest, SingleEndpointUnhealthy) { + initialize(); + server_health_check_specifier_ = makeHealthCheckSpecifier(); + + // Server <--> Envoy + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); + + // Server asks for healthchecking + hds_stream_->startGrpcStream(); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + // Envoy sends a healthcheck message to an endpoint + healthcheckEndpoints(); + + // Endpoint responds to the healthcheck + host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "404"}}, false); + host_stream_->encodeData(1024, true); + + // Envoy reports back to server + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + + // Check that the response_ is correct + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()); + checkCounters(1, 2, 0, 1); + + // Clean up connections + cleanupHostConnections(); + cleanupHdsConnection(); +} + +// Tests that Envoy can healthcheck two hosts that are in the same cluster, and +// the same locality and report back the correct health statuses. +TEST_P(HdsIntegrationTest, TwoEndpointsSameLocality) { + initialize(); + + server_health_check_specifier_ = makeHealthCheckSpecifier(); + auto* endpoint = server_health_check_specifier_.mutable_health_check(0)->mutable_endpoints(0); + endpoint->add_endpoints()->mutable_address()->mutable_socket_address()->set_address( + host2_upstream_->localAddress()->ip()->addressAsString()); + endpoint->mutable_endpoints(1)->mutable_address()->mutable_socket_address()->set_port_value( + host2_upstream_->localAddress()->ip()->port()); + + // Server <--> Envoy + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); + + // Server asks for healthchecking + hds_stream_->startGrpcStream(); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + healthcheckEndpoints("anna"); + + // Endpoints repond to the healthcheck + host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "404"}}, false); + host_stream_->encodeData(1024, true); + host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + host2_stream_->encodeData(1024, true); + + // Envoy reports back to server + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + + // Check that the response_ is correct + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()); + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), + envoy::api::v2::core::HealthStatus::HEALTHY, + host2_upstream_->localAddress()); + checkCounters(1, 2, 1, 1); + + // Clean up connections + cleanupHostConnections(); + cleanupHdsConnection(); +} + +// Tests that Envoy can healthcheck two hosts that are in the same cluster, and +// different localities and report back the correct health statuses. +TEST_P(HdsIntegrationTest, TwoEndpointsDifferentLocality) { + initialize(); + server_health_check_specifier_ = makeHealthCheckSpecifier(); + + // Add endpoint + auto* health_check = server_health_check_specifier_.mutable_health_check(0); + + health_check->add_endpoints() + ->add_endpoints() + ->mutable_address() + ->mutable_socket_address() + ->set_address(host2_upstream_->localAddress()->ip()->addressAsString()); + health_check->mutable_endpoints(1) + ->mutable_endpoints(0) + ->mutable_address() + ->mutable_socket_address() + ->set_port_value(host2_upstream_->localAddress()->ip()->port()); + health_check->mutable_endpoints(1)->mutable_locality()->set_region("different_region"); + health_check->mutable_endpoints(1)->mutable_locality()->set_zone("different_zone"); + health_check->mutable_endpoints(1)->mutable_locality()->set_sub_zone("emplisi"); + + // Server <--> Envoy + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); + + // Server asks for healthchecking + hds_stream_->startGrpcStream(); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + // Envoy sends healthcheck messages to two endpoints + healthcheckEndpoints("anna"); + + // Endpoint responds to the healthcheck + host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "404"}}, false); + host_stream_->encodeData(1024, true); + host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + host2_stream_->encodeData(1024, true); + + // Envoy reports back to server + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); + + // Check that the response_ is correct + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()); + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), + envoy::api::v2::core::HealthStatus::HEALTHY, + host2_upstream_->localAddress()); + checkCounters(1, 2, 1, 1); + + // Clean up connections + cleanupHostConnections(); + cleanupHdsConnection(); +} + +// Tests that Envoy can healthcheck two hosts that are in different clusters, and +// report back the correct health statuses. +TEST_P(HdsIntegrationTest, TwoEndpointsDifferentClusters) { + initialize(); + server_health_check_specifier_ = makeHealthCheckSpecifier(); + + // Add endpoint + auto* health_check = server_health_check_specifier_.add_health_check(); + + health_check->set_cluster_name("cat"); + health_check->add_endpoints() + ->add_endpoints() + ->mutable_address() + ->mutable_socket_address() + ->set_address(host2_upstream_->localAddress()->ip()->addressAsString()); + health_check->mutable_endpoints(0) + ->mutable_endpoints(0) + ->mutable_address() + ->mutable_socket_address() + ->set_port_value(host2_upstream_->localAddress()->ip()->port()); + health_check->mutable_endpoints(0)->mutable_locality()->set_region("peculiar_region"); + health_check->mutable_endpoints(0)->mutable_locality()->set_zone("peculiar_zone"); + health_check->mutable_endpoints(0)->mutable_locality()->set_sub_zone("paris"); + + health_check->add_health_checks()->mutable_timeout()->set_seconds(1); + health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(1); + health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); + health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); + health_check->mutable_health_checks(0)->mutable_grpc_health_check(); + health_check->mutable_health_checks(0)->mutable_http_health_check()->set_use_http2(false); + health_check->mutable_health_checks(0)->mutable_http_health_check()->set_path("/healthcheck"); + + // Server <--> Envoy + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); + + // Server asks for healthchecking + hds_stream_->startGrpcStream(); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + // Envoy sends healthcheck messages to two endpoints + healthcheckEndpoints("cat"); + + // Endpoint responds to the healthcheck + host_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "404"}}, false); + host_stream_->encodeData(1024, true); + host2_stream_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + host2_stream_->encodeData(1024, true); + + // Envoy reports back to server + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, response_)); - EXPECT_EQ(1, test_server_->counter("hds_delegate.requests")->value()); - EXPECT_EQ(2, test_server_->counter("hds_delegate.responses")->value()); + // Check that the response_ is correct + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(0), + envoy::api::v2::core::HealthStatus::UNHEALTHY, + host_upstream_->localAddress()); + checkEndpointHealthResponse(response_.endpoint_health_response().endpoints_health(1), + envoy::api::v2::core::HealthStatus::HEALTHY, + host2_upstream_->localAddress()); + checkCounters(1, 2, 0, 1); + EXPECT_EQ(1, test_server_->counter("cluster.cat.health_check.success")->value()); + EXPECT_EQ(0, test_server_->counter("cluster.cat.health_check.failure")->value()); + // Clean up connections + cleanupHostConnections(); cleanupHdsConnection(); } diff --git a/test/mocks/upstream/BUILD b/test/mocks/upstream/BUILD index 6e05fd6689ff9..c23d76e9ed2aa 100644 --- a/test/mocks/upstream/BUILD +++ b/test/mocks/upstream/BUILD @@ -45,6 +45,7 @@ envoy_cc_mock( "//include/envoy/upstream:health_checker_interface", "//include/envoy/upstream:load_balancer_interface", "//include/envoy/upstream:upstream_interface", + "//source/common/upstream:health_discovery_service_lib", "//source/common/upstream:upstream_lib", "//test/mocks/config:config_mocks", "//test/mocks/grpc:grpc_mocks", diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 55a946509f30f..bfdbd73624fa9 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -12,6 +12,7 @@ #include "envoy/upstream/upstream.h" #include "common/common/callback_impl.h" +#include "common/upstream/health_discovery_service.h" #include "common/upstream/upstream_impl.h" #include "test/mocks/config/mocks.h" @@ -273,5 +274,15 @@ class MockClusterUpdateCallbacks : public ClusterUpdateCallbacks { MOCK_METHOD1(onClusterRemoval, void(const std::string& cluster_name)); }; +class MockClusterInfoFactory : public ClusterInfoFactory, Logger::Loggable { +public: + MOCK_METHOD7( + createClusterInfo, + ClusterInfoConstSharedPtr(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, + Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, + Secret::SecretManager& secret_manager, bool added_via_api)); +}; + } // namespace Upstream } // namespace Envoy From 50af2171d6846db07dbb6f7e890ca9e513225cfd Mon Sep 17 00:00:00 2001 From: Lilika Markatou Date: Mon, 6 Aug 2018 17:46:28 -0400 Subject: [PATCH 2/3] Fixing the conflict Signed-off-by: Lilika Markatou --- include/envoy/upstream/cluster_manager.h | 5 +- source/common/upstream/BUILD | 4 ++ .../upstream/health_discovery_service.cc | 64 +++++++++++++------ .../upstream/health_discovery_service.h | 27 +++++--- source/server/server.cc | 4 +- test/common/upstream/BUILD | 2 +- test/common/upstream/hds_test.cc | 13 ++-- test/mocks/upstream/mocks.h | 13 ++-- 8 files changed, 87 insertions(+), 45 deletions(-) diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 07c93b67e48a5..6280e2e2f3cd4 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -316,8 +316,9 @@ class ClusterInfoFactory { virtual Upstream::ClusterInfoConstSharedPtr createClusterInfo(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, - bool added_via_api) PURE; + Ssl::ContextManager& ssl_context_manager, bool added_via_api, + ClusterManager& cm, const LocalInfo::LocalInfo& local_info, + Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random) PURE; }; } // namespace Upstream diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index f13b4b490ba34..5f4579bac279e 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -170,13 +170,17 @@ envoy_cc_library( ":upstream_includes", "//include/envoy/event:dispatcher_interface", "//include/envoy/runtime:runtime_interface", + "//include/envoy/server:transport_socket_config_interface", "//include/envoy/ssl:context_manager_interface", "//include/envoy/stats:stats_macros", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:upstream_interface", "//source/common/common:minimal_logger_lib", + "//source/common/config:utility_lib", "//source/common/grpc:async_client_lib", "//source/common/network:resolver_lib", + "//source/extensions/transport_sockets:well_known_names", + "//source/server:transport_socket_config_lib", "@envoy_api//envoy/service/discovery/v2:hds_cc", ], ) diff --git a/source/common/upstream/health_discovery_service.cc b/source/common/upstream/health_discovery_service.cc index 1a5fcd76d5dd5..387e61ebfc294 100644 --- a/source/common/upstream/health_discovery_service.cc +++ b/source/common/upstream/health_discovery_service.cc @@ -8,17 +8,17 @@ namespace Upstream { HdsDelegate::HdsDelegate(const envoy::api::v2::core::Node& node, Stats::Scope& scope, Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Envoy::Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, - Secret::SecretManager& secret_manager, Runtime::RandomGenerator& random, + Ssl::ContextManager& ssl_context_manager, Runtime::RandomGenerator& random, ClusterInfoFactory& info_factory, - AccessLog::AccessLogManager& access_log_manager) + AccessLog::AccessLogManager& access_log_manager, ClusterManager& cm, + const LocalInfo::LocalInfo& local_info) : stats_{ALL_HDS_STATS(POOL_COUNTER_PREFIX(scope, "hds_delegate."))}, service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.HealthDiscoveryService.StreamHealthCheck")), async_client_(std::move(async_client)), dispatcher_(dispatcher), runtime_(runtime), - store_stats(stats), ssl_context_manager_(ssl_context_manager), - secret_manager_(secret_manager), random_(random), info_factory_(info_factory), - access_log_manager_(access_log_manager) { + store_stats(stats), ssl_context_manager_(ssl_context_manager), random_(random), + info_factory_(info_factory), access_log_manager_(access_log_manager), cm_(cm), + local_info_(local_info) { health_check_request_.mutable_node()->MergeFrom(node); hds_retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); hds_stream_response_timer_ = dispatcher.createTimer([this]() -> void { sendResponse(); }); @@ -125,8 +125,8 @@ void HdsDelegate::processMessage( // Create HdsCluster hds_clusters_.emplace_back(new HdsCluster(runtime_, cluster_config, bind_config, store_stats, - ssl_context_manager_, secret_manager_, false, - info_factory_)); + ssl_context_manager_, false, info_factory_, cm_, + local_info_, dispatcher_, random_)); hds_clusters_.back()->startHealthchecks(access_log_manager_, runtime_, random_, dispatcher_); } @@ -160,17 +160,19 @@ void HdsDelegate::onRemoteClose(Grpc::Status::GrpcStatus status, const std::stri HdsCluster::HdsCluster(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, - Secret::SecretManager& secret_manager, bool added_via_api, - ClusterInfoFactory& info_factory) + Ssl::ContextManager& ssl_context_manager, bool added_via_api, + ClusterInfoFactory& info_factory, ClusterManager& cm, + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Runtime::RandomGenerator& random) : runtime_(runtime), cluster_(cluster), bind_config_(bind_config), stats_(stats), - ssl_context_manager_(ssl_context_manager), secret_manager_(secret_manager), - added_via_api_(added_via_api), initial_hosts_(new HostVector()) { + ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api), + initial_hosts_(new HostVector()) { ENVOY_LOG(debug, "Creating an HdsCluster"); priority_set_.getOrCreateHostSet(0); - info_ = info_factory.createClusterInfo(runtime_, cluster_, bind_config_, stats_, - ssl_context_manager_, secret_manager_, added_via_api_); + info_ = + info_factory.createClusterInfo(runtime_, cluster_, bind_config_, stats_, ssl_context_manager_, + added_via_api_, cm, local_info, dispatcher, random); for (const auto& host : cluster.hosts()) { initial_hosts_->emplace_back( @@ -197,11 +199,35 @@ HostVectorConstSharedPtr HdsCluster::createHealthyHostList(const HostVector& hos ClusterInfoConstSharedPtr ProdClusterInfoFactory::createClusterInfo( Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, - bool added_via_api) { + Ssl::ContextManager& ssl_context_manager, bool added_via_api, ClusterManager& cm, + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Runtime::RandomGenerator& random) { + + Envoy::Stats::ScopePtr scope = stats.createScope(fmt::format("cluster.{}.", cluster.name())); + + Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context( + ssl_context_manager, *scope, cm, local_info, dispatcher, random, stats); + + auto transport_socket = cluster.transport_socket(); + if (!cluster.has_transport_socket()) { + if (cluster.has_tls_context()) { + transport_socket.set_name(Extensions::TransportSockets::TransportSocketNames::get().Tls); + MessageUtil::jsonConvert(cluster.tls_context(), *transport_socket.mutable_config()); + } else { + transport_socket.set_name( + Extensions::TransportSockets::TransportSocketNames::get().RawBuffer); + } + } + + auto& config_factory = Config::Utility::getAndCheckFactory< + Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket.name()); + ProtobufTypes::MessagePtr message = + Config::Utility::translateToFactoryConfig(transport_socket, config_factory); + Network::TransportSocketFactoryPtr socket_factory = + config_factory.createTransportSocketFactory(*message, factory_context); - return std::make_unique(cluster, bind_config, runtime, stats, - ssl_context_manager, secret_manager, added_via_api); + return std::make_unique(cluster, bind_config, runtime, std::move(socket_factory), + std::move(scope), added_via_api); } void HdsCluster::startHealthchecks(AccessLog::AccessLogManager& access_log_manager, diff --git a/source/common/upstream/health_discovery_service.h b/source/common/upstream/health_discovery_service.h index f167a20324fb8..21bf28f272f4e 100644 --- a/source/common/upstream/health_discovery_service.h +++ b/source/common/upstream/health_discovery_service.h @@ -1,17 +1,23 @@ #pragma once #include "envoy/event/dispatcher.h" +#include "envoy/server/transport_socket_config.h" #include "envoy/service/discovery/v2/hds.pb.h" #include "envoy/ssl/context_manager.h" #include "envoy/stats/stats_macros.h" #include "envoy/upstream/upstream.h" #include "common/common/logger.h" +#include "common/config/utility.h" #include "common/grpc/async_client_impl.h" #include "common/network/resolver_impl.h" #include "common/upstream/health_checker_impl.h" #include "common/upstream/upstream_impl.h" +#include "server/transport_socket_config_impl.h" + +#include "extensions/transport_sockets/well_known_names.h" + namespace Envoy { namespace Upstream { @@ -20,8 +26,9 @@ class ProdClusterInfoFactory : public ClusterInfoFactory, Logger::Loggable { static ClusterSharedPtr create(); HdsCluster(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, - bool added_via_api, ClusterInfoFactory& info_factory); + Ssl::ContextManager& ssl_context_manager, bool added_via_api, + ClusterInfoFactory& info_factory, ClusterManager& cm, + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Runtime::RandomGenerator& random); // From Upstream::Cluster InitializePhase initializePhase() const override { return InitializePhase::Primary; } @@ -72,7 +81,6 @@ class HdsCluster : public Cluster, Logger::Loggable { const envoy::api::v2::core::BindConfig& bind_config_; Stats::Store& stats_; Ssl::ContextManager& ssl_context_manager_; - Secret::SecretManager& secret_manager_; bool added_via_api_; HostVectorSharedPtr initial_hosts_; @@ -113,9 +121,9 @@ class HdsDelegate HdsDelegate(const envoy::api::v2::core::Node& node, Stats::Scope& scope, Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Envoy::Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, Secret::SecretManager& secret_manager, - Runtime::RandomGenerator& random, ClusterInfoFactory& info_factory, - AccessLog::AccessLogManager& access_log_manager); + Ssl::ContextManager& ssl_context_manager, Runtime::RandomGenerator& random, + ClusterInfoFactory& info_factory, AccessLog::AccessLogManager& access_log_manager, + ClusterManager& cm, const LocalInfo::LocalInfo& local_info); // Grpc::TypedAsyncStreamCallbacks void onCreateInitialMetadata(Http::HeaderMap& metadata) override; @@ -148,10 +156,11 @@ class HdsDelegate Runtime::Loader& runtime_; Envoy::Stats::Store& store_stats; Ssl::ContextManager& ssl_context_manager_; - Secret::SecretManager& secret_manager_; Runtime::RandomGenerator& random_; ClusterInfoFactory& info_factory_; AccessLog::AccessLogManager& access_log_manager_; + ClusterManager& cm_; + const LocalInfo::LocalInfo& local_info_; envoy::service::discovery::v2::HealthCheckRequest health_check_request_; std::unique_ptr health_check_message_; diff --git a/source/server/server.cc b/source/server/server.cc index 896359fc7ea66..8511d090f796d 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -289,8 +289,8 @@ void InstanceImpl::initialize(Options& options, bootstrap_.node(), stats(), Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, hds_config, stats()) ->create(), - dispatcher(), runtime(), stats(), sslContextManager(), secretManager(), random(), - info_factory_, access_log_manager_)); + dispatcher(), runtime(), stats(), sslContextManager(), random(), info_factory_, + access_log_manager_, clusterManager(), localInfo())); } for (Stats::SinkPtr& sink : main_config->statsSinks()) { diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index 357c286b10606..befa3fa0df8e1 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -167,11 +167,11 @@ envoy_cc_test( srcs = ["hds_test.cc"], deps = [ "//source/common/ssl:context_lib", - "//source/common/stats:stats_lib", "//source/common/upstream:health_discovery_service_lib", "//test/mocks/access_log:access_log_mocks", "//test/mocks/event:event_mocks", "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", "//test/mocks/network:network_mocks", "//test/mocks/upstream:upstream_mocks", "//test/test_common:utility_lib", diff --git a/test/common/upstream/hds_test.cc b/test/common/upstream/hds_test.cc index b175e525433e3..9d5a628dac404 100644 --- a/test/common/upstream/hds_test.cc +++ b/test/common/upstream/hds_test.cc @@ -1,12 +1,12 @@ #include "envoy/service/discovery/v2/hds.pb.h" #include "common/ssl/context_manager_impl.h" -#include "common/stats/stats_impl.h" #include "common/upstream/health_discovery_service.h" #include "test/mocks/access_log/mocks.h" #include "test/mocks/event/mocks.h" #include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" #include "test/mocks/network/mocks.h" #include "test/mocks/upstream/mocks.h" #include "test/test_common/utility.h" @@ -59,7 +59,7 @@ class HdsTest : public testing::Test { })); hds_delegate_.reset(new HdsDelegate(node_, stats_store_, Grpc::AsyncClientPtr(async_client_), dispatcher_, runtime_, stats_store_, ssl_context_manager_, - secret_manager_, random_, test_factory_, log_manager_)); + random_, test_factory_, log_manager_, cm_, local_info_)); } // Creates a HealthCheckSpecifier message that contains one endpoint and one @@ -107,9 +107,10 @@ class HdsTest : public testing::Test { Grpc::MockAsyncClient* async_client_; Runtime::MockLoader runtime_; Ssl::ContextManagerImpl ssl_context_manager_{runtime_}; - Secret::MockSecretManager secret_manager_; NiceMock random_; NiceMock log_manager_; + NiceMock cm_; + NiceMock local_info_; }; // Test if processMessage processes endpoints from a HealthCheckSpecifier @@ -136,7 +137,7 @@ TEST_F(HdsTest, TestProcessMessageEndpoints) { } // Process message - EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _)).Times(2); + EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _, _, _, _)).Times(2); hds_delegate_friend_.processPrivateMessage(*hds_delegate_, std::move(message)); // Check Correctness @@ -179,7 +180,7 @@ TEST_F(HdsTest, TestProcessMessageHealthChecks) { } // Process message - EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _)) + EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _, _, _, _)) .WillRepeatedly(Return(cluster_info_)); hds_delegate_friend_.processPrivateMessage(*hds_delegate_, std::move(message)); @@ -253,7 +254,7 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) { EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillRepeatedly(Return(connection_)); EXPECT_CALL(*server_response_timer_, enableTimer(_)).Times(2); EXPECT_CALL(async_stream_, sendMessage(_, false)); - EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _)) + EXPECT_CALL(test_factory_, createClusterInfo(_, _, _, _, _, _, _, _, _, _)) .WillOnce(Return(cluster_info_)); EXPECT_CALL(*connection_, setBufferLimits(_)); EXPECT_CALL(dispatcher_, deferredDelete_(_)); diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index bfdbd73624fa9..08e5cd394fb30 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -276,12 +276,13 @@ class MockClusterUpdateCallbacks : public ClusterUpdateCallbacks { class MockClusterInfoFactory : public ClusterInfoFactory, Logger::Loggable { public: - MOCK_METHOD7( - createClusterInfo, - ClusterInfoConstSharedPtr(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, - const envoy::api::v2::core::BindConfig& bind_config, - Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, - Secret::SecretManager& secret_manager, bool added_via_api)); + MOCK_METHOD10(createClusterInfo, + ClusterInfoConstSharedPtr( + Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster, + const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats, + Ssl::ContextManager& ssl_context_manager, bool added_via_api, + ClusterManager& cm, const LocalInfo::LocalInfo& local_info, + Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random)); }; } // namespace Upstream From 84230526b0d81d9e55070c7ec5f49ca312beeefd Mon Sep 17 00:00:00 2001 From: Lilika Markatou Date: Tue, 7 Aug 2018 11:55:29 -0400 Subject: [PATCH 3/3] Addressing comments Signed-off-by: Lilika Markatou --- .../common/upstream/health_discovery_service.cc | 17 +---------------- source/common/upstream/upstream_impl.cc | 4 ++-- source/common/upstream/upstream_impl.h | 9 +++++++++ 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/source/common/upstream/health_discovery_service.cc b/source/common/upstream/health_discovery_service.cc index 387e61ebfc294..1412493081e29 100644 --- a/source/common/upstream/health_discovery_service.cc +++ b/source/common/upstream/health_discovery_service.cc @@ -208,23 +208,8 @@ ClusterInfoConstSharedPtr ProdClusterInfoFactory::createClusterInfo( Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context( ssl_context_manager, *scope, cm, local_info, dispatcher, random, stats); - auto transport_socket = cluster.transport_socket(); - if (!cluster.has_transport_socket()) { - if (cluster.has_tls_context()) { - transport_socket.set_name(Extensions::TransportSockets::TransportSocketNames::get().Tls); - MessageUtil::jsonConvert(cluster.tls_context(), *transport_socket.mutable_config()); - } else { - transport_socket.set_name( - Extensions::TransportSockets::TransportSocketNames::get().RawBuffer); - } - } - - auto& config_factory = Config::Utility::getAndCheckFactory< - Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket.name()); - ProtobufTypes::MessagePtr message = - Config::Utility::translateToFactoryConfig(transport_socket, config_factory); Network::TransportSocketFactoryPtr socket_factory = - config_factory.createTransportSocketFactory(*message, factory_context); + Upstream::createTransportSocketFactory(cluster, factory_context); return std::make_unique(cluster, bind_config, runtime, std::move(socket_factory), std::move(scope), added_via_api); diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 83117cf59c5fb..0e56beca9dd6f 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -341,6 +341,8 @@ Stats::ScopePtr generateStatsScope(const envoy::api::v2::Cluster& config, Stats: : std::string(config.alt_stat_name()))); } +} // namespace + Network::TransportSocketFactoryPtr createTransportSocketFactory( const envoy::api::v2::Cluster& config, Server::Configuration::TransportSocketFactoryContext& factory_context) { @@ -365,8 +367,6 @@ Network::TransportSocketFactoryPtr createTransportSocketFactory( return config_factory.createTransportSocketFactory(*message, factory_context); } -} // namespace - ClusterSharedPtr ClusterImplBase::create( const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Stats::Store& stats, ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver, diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index cbd158034fc7a..e29c0b851c536 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -425,6 +425,15 @@ class ClusterInfoImpl : public ClusterInfo { const bool drain_connections_on_host_removal_; }; +/** + * Function that creates a Network::TransportSocketFactoryPtr + * given a cluster configuration and transport socket factory + * context. + */ +Network::TransportSocketFactoryPtr +createTransportSocketFactory(const envoy::api::v2::Cluster& config, + Server::Configuration::TransportSocketFactoryContext& factory_context); + /** * Base class all primary clusters. */