Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ envoy_cc_library(
"//include/envoy/http:codec_interface",
"//include/envoy/network:connection_interface",
"//include/envoy/network:transport_socket_interface",
"//include/envoy/runtime:runtime_interface",
"//include/envoy/ssl:context_interface",
"//include/envoy/ssl:context_manager_interface",
],
)
28 changes: 28 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "envoy/runtime/runtime.h"
#include "envoy/secret/secret_manager.h"
#include "envoy/server/admin.h"
#include "envoy/ssl/context_manager.h"
#include "envoy/stats/store.h"
#include "envoy/tcp/conn_pool.h"
#include "envoy/upstream/health_checker.h"
Expand Down Expand Up @@ -294,5 +295,32 @@ class ClusterManagerFactory {
virtual Secret::SecretManager& secretManager() PURE;
};

/**
* Factory for creating ClusterInfo
*/
class ClusterInfoFactory {
public:
virtual ~ClusterInfoFactory() {}

/**
* This method returns a Upstream::ClusterInfoConstSharedPtr
*
* @param runtime supplies the runtime loader.
* @param cluster supplies the owning cluster.
* @param bind_config supplies information on binding newly established connections.
* @param stats supplies a store for all known counters, gauges, and timers.
* @param ssl_context_manager supplies a manager for all SSL contexts.
* @param secret_manager supplies a manager for static secrets.
* @param added_via_api denotes whether this was added via API.
* @return Upstream::ClusterInfoConstSharedPtr
*/
virtual Upstream::ClusterInfoConstSharedPtr
createClusterInfo(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster,
const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager, bool added_via_api,
ClusterManager& cm, const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random) PURE;
};

} // namespace Upstream
} // namespace Envoy
11 changes: 11 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,21 @@ envoy_cc_library(
srcs = ["health_discovery_service.cc"],
hdrs = ["health_discovery_service.h"],
deps = [
":health_checker_lib",
":upstream_includes",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/runtime:runtime_interface",
"//include/envoy/server:transport_socket_config_interface",
"//include/envoy/ssl:context_manager_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config:utility_lib",
"//source/common/grpc:async_client_lib",
"//source/common/network:resolver_lib",
"//source/extensions/transport_sockets:well_known_names",
"//source/server:transport_socket_config_lib",
"@envoy_api//envoy/service/discovery/v2:hds_cc",
],
)
Expand Down
208 changes: 189 additions & 19 deletions source/common/upstream/health_discovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,31 @@ namespace Envoy {
namespace Upstream {

HdsDelegate::HdsDelegate(const envoy::api::v2::core::Node& node, Stats::Scope& scope,
Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher)
Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher,
Runtime::Loader& runtime, Envoy::Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager, Runtime::RandomGenerator& random,
ClusterInfoFactory& info_factory,
AccessLog::AccessLogManager& access_log_manager, ClusterManager& cm,
const LocalInfo::LocalInfo& local_info)
: stats_{ALL_HDS_STATS(POOL_COUNTER_PREFIX(scope, "hds_delegate."))},
async_client_(std::move(async_client)),
service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.HealthDiscoveryService.StreamHealthCheck")) {
"envoy.service.discovery.v2.HealthDiscoveryService.StreamHealthCheck")),
async_client_(std::move(async_client)), dispatcher_(dispatcher), runtime_(runtime),
store_stats(stats), ssl_context_manager_(ssl_context_manager), random_(random),
info_factory_(info_factory), access_log_manager_(access_log_manager), cm_(cm),
local_info_(local_info) {
health_check_request_.mutable_node()->MergeFrom(node);
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
response_timer_ = dispatcher.createTimer([this]() -> void { sendHealthCheckRequest(); });
hds_retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
hds_stream_response_timer_ = dispatcher.createTimer([this]() -> void { sendResponse(); });
establishNewStream();
}

void HdsDelegate::setRetryTimer() {
retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS));
void HdsDelegate::setHdsRetryTimer() {
hds_retry_timer_->enableTimer(std::chrono::milliseconds(RetryDelayMilliseconds));
}

void HdsDelegate::setHdsStreamResponseTimer() {
hds_stream_response_timer_->enableTimer(std::chrono::milliseconds(server_response_ms_));
}

void HdsDelegate::establishNewStream() {
Expand All @@ -32,20 +44,46 @@ void HdsDelegate::establishNewStream() {
return;
}

sendHealthCheckRequest();
}

void HdsDelegate::sendHealthCheckRequest() {
ENVOY_LOG(debug, "Sending HealthCheckRequest");
// TODO(lilika): Add support for other types of healthchecks
health_check_request_.mutable_capability()->add_health_check_protocol(
envoy::service::discovery::v2::Capability::HTTP);
ENVOY_LOG(debug, "Sending HealthCheckRequest {} ", health_check_request_.DebugString());
stream_->sendMessage(health_check_request_, false);
stats_.responses_.inc();
}

// TODO(lilika) : Use jittered backoff as in https://github.com/envoyproxy/envoy/pull/3791
void HdsDelegate::handleFailure() {
ENVOY_LOG(warn, "Load reporter stats stream/connection failure, will retry in {} ms.",
RETRY_DELAY_MS);
ENVOY_LOG(warn, "HdsDelegate stream/connection failure, will retry in {} ms.",
RetryDelayMilliseconds);
stats_.errors_.inc();
setRetryTimer();
setHdsRetryTimer();
}

// TODO(lilika): Add support for the same endpoint in different clusters/ports
envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse
HdsDelegate::sendResponse() {
envoy::service::discovery::v2::HealthCheckRequestOrEndpointHealthResponse response;
for (const auto& cluster : hds_clusters_) {
for (const auto& hosts : cluster->prioritySet().hostSetsPerPriority()) {
for (const auto& host : hosts->hosts()) {
auto* endpoint = response.mutable_endpoint_health_response()->add_endpoints_health();
Network::Utility::addressToProtobufAddress(
*host->address(), *endpoint->mutable_endpoint()->mutable_address());
// TODO(lilika): Add support for more granular options of envoy::api::v2::core::HealthStatus
if (host->healthy()) {
endpoint->set_health_status(envoy::api::v2::core::HealthStatus::HEALTHY);
} else {
endpoint->set_health_status(envoy::api::v2::core::HealthStatus::UNHEALTHY);
}
}
}
}
ENVOY_LOG(debug, "Sending EndpointHealthResponse to server {}", response.DebugString());
stream_->sendMessage(response, false);
stats_.responses_.inc();
setHdsStreamResponseTimer();
return response;
}

void HdsDelegate::onCreateInitialMetadata(Http::HeaderMap& metadata) {
Expand All @@ -56,12 +94,59 @@ void HdsDelegate::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) {
UNREFERENCED_PARAMETER(metadata);
}

void HdsDelegate::processMessage(
std::unique_ptr<envoy::service::discovery::v2::HealthCheckSpecifier>&& message) {
ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
ASSERT(message);

for (const auto& cluster_health_check : message->health_check()) {
// Create HdsCluster config
static const envoy::api::v2::core::BindConfig bind_config;
envoy::api::v2::Cluster cluster_config;

cluster_config.set_name(cluster_health_check.cluster_name());
cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds);
cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value(
ClusterConnectionBufferLimitBytes);

// Add endpoints to cluster
for (const auto& locality_endpoints : cluster_health_check.endpoints()) {
for (const auto& endpoint : locality_endpoints.endpoints()) {
cluster_config.add_hosts()->MergeFrom(endpoint.address());
}
}

// TODO(lilika): Add support for optional per-endpoint health checks

// Add healthchecks to cluster
for (auto& health_check : cluster_health_check.health_checks()) {
cluster_config.add_health_checks()->MergeFrom(health_check);
}

ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString());

// Create HdsCluster
hds_clusters_.emplace_back(new HdsCluster(runtime_, cluster_config, bind_config, store_stats,
ssl_context_manager_, false, info_factory_, cm_,
local_info_, dispatcher_, random_));

hds_clusters_.back()->startHealthchecks(access_log_manager_, runtime_, random_, dispatcher_);
}
}

// TODO(lilika): Add support for subsequent HealthCheckSpecifier messages that
// might modify the HdsClusters
void HdsDelegate::onReceiveMessage(
std::unique_ptr<envoy::service::discovery::v2::HealthCheckSpecifier>&& message) {
ENVOY_LOG(debug, "New health check response ", message->DebugString());
stats_.requests_.inc();
stream_->sendMessage(health_check_request_, false);
stats_.responses_.inc();
ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());

// Process the HealthCheckSpecifier message
processMessage(std::move(message));

// Set response
server_response_ms_ = PROTOBUF_GET_MS_REQUIRED(*message, interval);
setHdsStreamResponseTimer();
}

void HdsDelegate::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) {
Expand All @@ -70,10 +155,95 @@ void HdsDelegate::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) {

void HdsDelegate::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
ENVOY_LOG(warn, "gRPC config stream closed: {}, {}", status, message);
response_timer_->disableTimer();
hds_stream_response_timer_->disableTimer();
stream_ = nullptr;
handleFailure();
}

HdsCluster::HdsCluster(Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster,
const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager, bool added_via_api,
ClusterInfoFactory& info_factory, ClusterManager& cm,
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random)
: runtime_(runtime), cluster_(cluster), bind_config_(bind_config), stats_(stats),
ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api),
initial_hosts_(new HostVector()) {
ENVOY_LOG(debug, "Creating an HdsCluster");
priority_set_.getOrCreateHostSet(0);

info_ =
info_factory.createClusterInfo(runtime_, cluster_, bind_config_, stats_, ssl_context_manager_,
added_via_api_, cm, local_info, dispatcher, random);

for (const auto& host : cluster.hosts()) {
initial_hosts_->emplace_back(
new HostImpl(info_, "", Network::Address::resolveProtoAddress(host),
envoy::api::v2::core::Metadata::default_instance(), 1,
envoy::api::v2::core::Locality().default_instance(),
envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance()));
}
initialize([] {});
}

ClusterSharedPtr HdsCluster::create() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

HostVectorConstSharedPtr HdsCluster::createHealthyHostList(const HostVector& hosts) {
HostVectorSharedPtr healthy_list(new HostVector());
for (const auto& host : hosts) {
if (host->healthy()) {
healthy_list->emplace_back(host);
}
}
return healthy_list;
}

ClusterInfoConstSharedPtr ProdClusterInfoFactory::createClusterInfo(
Runtime::Loader& runtime, const envoy::api::v2::Cluster& cluster,
const envoy::api::v2::core::BindConfig& bind_config, Stats::Store& stats,
Ssl::ContextManager& ssl_context_manager, bool added_via_api, ClusterManager& cm,
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random) {

Envoy::Stats::ScopePtr scope = stats.createScope(fmt::format("cluster.{}.", cluster.name()));

Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context(
ssl_context_manager, *scope, cm, local_info, dispatcher, random, stats);

Network::TransportSocketFactoryPtr socket_factory =
Upstream::createTransportSocketFactory(cluster, factory_context);

return std::make_unique<ClusterInfoImpl>(cluster, bind_config, runtime, std::move(socket_factory),
std::move(scope), added_via_api);
}

void HdsCluster::startHealthchecks(AccessLog::AccessLogManager& access_log_manager,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
Event::Dispatcher& dispatcher) {

for (auto& health_check : cluster_.health_checks()) {
health_checkers_.push_back(Upstream::HealthCheckerFactory::create(
health_check, *this, runtime, random, dispatcher, access_log_manager));
health_checkers_.back()->start();
}
}

void HdsCluster::initialize(std::function<void()> callback) {
initialization_complete_callback_ = callback;
for (const auto& host : *initial_hosts_) {
host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
}

auto& first_host_set = priority_set_.getOrCreateHostSet(0);
auto healthy = createHealthyHostList(*initial_hosts_);

first_host_set.updateHosts(initial_hosts_, healthy, HostsPerLocalityImpl::empty(),
HostsPerLocalityImpl::empty(), {}, *initial_hosts_, {});
}

void HdsCluster::setOutlierDetector(const Outlier::DetectorSharedPtr&) {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

} // namespace Upstream
} // namespace Envoy
Loading