Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/root/intro/arch_overview/upstream/aggregate_cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,27 @@ cannot locate that specific endpoint at the aggregate level. As a result, Statef
aggregate clusters, because the final cluster choice is made without direct knowledge of the specific endpoint which
doesn't exist at the top level.

Circuit Breaking
^^^^^^^^^^^^^^^^^
:ref:`Circuit Breakers <envoy_v3_api_msg_config.cluster.v3.circuitbreakers>` on the level of the aggregate cluster are
not configurable. The only circuit breaker that is configured on the aggregate cluster's level is **max_retries**.

This behavior is intentional as the underlying clusters are accessible through multiple paths (directly or via the
aggregate cluster(s)), and configuring the circuit breakers at the level of the aggregate cluster will multiply the
effective limit by the number of access paths. This would make the circuit breakers useless.

Unlike other circuit breakers, the **max_retries** circuit breaker must be configured at the aggregate cluster level
because when Envoy processes a retry request, it needs to determine whether the retry limit has been exceeded before it
selects the underlying cluster.

* For **max_connections**, **max_requests** and **max_pending_requests**:
when the configured limit is reached on the underlying cluster(s), only the underlying cluster(s)' circuit breaker opens. The aggregate cluster's circuit breaker remains closed at all times, regardless of whether the circuit breakers limits on the underlying cluster(s) are reached or not.

* For **max_retries**:
when the requests through the aggregate cluster reach the aggregate cluster's `max_retries` limit, the aggregate
cluster's circuit breaker opens and the underlying cluster(s)' circuit breakers remain closed. when the requests going directly to the underlying cluster(s) reach the underlying cluster(s) `max_retries` limit. The underlying
cluster(s)' circuit breaker opens.

Load Balancing Example
----------------------

Expand Down
123 changes: 59 additions & 64 deletions test/extensions/clusters/aggregate/cluster_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,70 @@ struct CircuitBreakerLimits {
uint32_t max_retries = 3;
uint32_t max_connection_pools = std::numeric_limits<uint32_t>::max();

CircuitBreakerLimits withMaxConnections(uint32_t max_connections) const {
CircuitBreakerLimits limits = *this;
limits.max_connections = max_connections;
return limits;
CircuitBreakerLimits& withMaxConnections(uint32_t max_connections) {
this->max_connections = max_connections;
return *this;
}

CircuitBreakerLimits withMaxRequests(uint32_t max_requests) const {
CircuitBreakerLimits limits = *this;
limits.max_requests = max_requests;
return limits;
CircuitBreakerLimits& withMaxRequests(uint32_t max_requests) {
this->max_requests = max_requests;
return *this;
}

CircuitBreakerLimits withMaxPendingRequests(uint32_t max_pending_requests) const {
CircuitBreakerLimits limits = *this;
limits.max_pending_requests = max_pending_requests;
return limits;
CircuitBreakerLimits& withMaxPendingRequests(uint32_t max_pending_requests) {
this->max_pending_requests = max_pending_requests;
return *this;
}

CircuitBreakerLimits withMaxRetries(uint32_t max_retries) const {
CircuitBreakerLimits limits = *this;
limits.max_retries = max_retries;
return limits;
CircuitBreakerLimits& withMaxRetries(uint32_t max_retries) {
this->max_retries = max_retries;
return *this;
}
};

void setCircuitBreakerLimits(envoy::config::cluster::v3::Cluster& cluster,
const CircuitBreakerLimits& limits) {
auto* cluster_circuit_breakers = cluster.mutable_circuit_breakers();

auto* cluster_circuit_breakers_threshold_default = cluster_circuit_breakers->add_thresholds();
cluster_circuit_breakers_threshold_default->set_priority(
envoy::config::core::v3::RoutingPriority::DEFAULT);

cluster_circuit_breakers_threshold_default->mutable_max_connections()->set_value(
limits.max_connections);
cluster_circuit_breakers_threshold_default->mutable_max_pending_requests()->set_value(
limits.max_pending_requests);
cluster_circuit_breakers_threshold_default->mutable_max_requests()->set_value(
limits.max_requests);
cluster_circuit_breakers_threshold_default->mutable_max_retries()->set_value(limits.max_retries);
cluster_circuit_breakers_threshold_default->mutable_max_connection_pools()->set_value(
limits.max_connection_pools);
cluster_circuit_breakers_threshold_default->set_track_remaining(true);
};

void setMaxConcurrentStreams(envoy::config::cluster::v3::Cluster& cluster,
uint32_t max_concurrent_streams) {
envoy::extensions::upstreams::http::v3::HttpProtocolOptions http_protocol_options;
http_protocol_options.mutable_explicit_http_config()
->mutable_http2_protocol_options()
->mutable_max_concurrent_streams()
->set_value(max_concurrent_streams);
(*cluster.mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]
.PackFrom(http_protocol_options);
};

void reduceAggregateClustersListToOneCluster(
envoy::config::cluster::v3::Cluster& aggregate_cluster) {
auto* aggregate_cluster_type = aggregate_cluster.mutable_cluster_type();
auto* aggregate_cluster_typed_config = aggregate_cluster_type->mutable_typed_config();
envoy::extensions::clusters::aggregate::v3::ClusterConfig new_aggregate_cluster_typed_config;
aggregate_cluster_typed_config->UnpackTo(&new_aggregate_cluster_typed_config);
new_aggregate_cluster_typed_config.clear_clusters();
new_aggregate_cluster_typed_config.add_clusters("cluster_1");
aggregate_cluster_typed_config->PackFrom(new_aggregate_cluster_typed_config);
};

const std::string& config() {
CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF(
admin:
Expand Down Expand Up @@ -211,50 +250,6 @@ class AggregateIntegrationTest
xds_stream_->startGrpcStream();
}

void setCircuitBreakerLimits(envoy::config::cluster::v3::Cluster& cluster,
const CircuitBreakerLimits& limits) {
auto* cluster_circuit_breakers = cluster.mutable_circuit_breakers();

auto* cluster_circuit_breakers_threshold_default = cluster_circuit_breakers->add_thresholds();
cluster_circuit_breakers_threshold_default->set_priority(
envoy::config::core::v3::RoutingPriority::DEFAULT);

cluster_circuit_breakers_threshold_default->mutable_max_connections()->set_value(
limits.max_connections);
cluster_circuit_breakers_threshold_default->mutable_max_pending_requests()->set_value(
limits.max_pending_requests);
cluster_circuit_breakers_threshold_default->mutable_max_requests()->set_value(
limits.max_requests);
cluster_circuit_breakers_threshold_default->mutable_max_retries()->set_value(
limits.max_retries);
cluster_circuit_breakers_threshold_default->mutable_max_connection_pools()->set_value(
limits.max_connection_pools);
cluster_circuit_breakers_threshold_default->set_track_remaining(true);
}

void setMaxConcurrentStreams(envoy::config::cluster::v3::Cluster& cluster,
uint32_t max_concurrent_streams) {
envoy::extensions::upstreams::http::v3::HttpProtocolOptions http_protocol_options;
http_protocol_options.mutable_explicit_http_config()
->mutable_http2_protocol_options()
->mutable_max_concurrent_streams()
->set_value(max_concurrent_streams);
(*cluster.mutable_typed_extension_protocol_options())
["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"]
.PackFrom(http_protocol_options);
}

void
reduceAggregateClustersListToOneCluster(envoy::config::cluster::v3::Cluster& aggregate_cluster) {
auto* aggregate_cluster_type = aggregate_cluster.mutable_cluster_type();
auto* aggregate_cluster_typed_config = aggregate_cluster_type->mutable_typed_config();
envoy::extensions::clusters::aggregate::v3::ClusterConfig new_aggregate_cluster_typed_config;
aggregate_cluster_typed_config->UnpackTo(&new_aggregate_cluster_typed_config);
new_aggregate_cluster_typed_config.clear_clusters();
new_aggregate_cluster_typed_config.add_clusters("cluster_1");
aggregate_cluster_typed_config->PackFrom(new_aggregate_cluster_typed_config);
}

const bool deferred_cluster_creation_;
envoy::config::cluster::v3::Cluster cluster1_;
envoy::config::cluster::v3::Cluster cluster2_;
Expand Down Expand Up @@ -388,7 +383,7 @@ TEST_P(AggregateIntegrationTest, PreviousPrioritiesRetryPredicate) {
TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) {
setDownstreamProtocol(Http::CodecType::HTTP2);

config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* static_resources = bootstrap.mutable_static_resources();
auto* aggregate_cluster = static_resources->mutable_clusters(1);

Expand Down Expand Up @@ -510,7 +505,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxConnections) {
TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) {
setDownstreamProtocol(Http::CodecType::HTTP2);

config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* static_resources = bootstrap.mutable_static_resources();
auto* aggregate_cluster = static_resources->mutable_clusters(1);

Expand Down Expand Up @@ -630,7 +625,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxRequests) {
TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) {
setDownstreamProtocol(Http::CodecType::HTTP2);

config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* static_resources = bootstrap.mutable_static_resources();
auto* aggregate_cluster = static_resources->mutable_clusters(1);

Expand Down Expand Up @@ -771,7 +766,7 @@ TEST_P(AggregateIntegrationTest, CircuitBreakerTestMaxPendingRequests) {
TEST_P(AggregateIntegrationTest, CircuitBreakerMaxRetriesTest) {
setDownstreamProtocol(Http::CodecType::HTTP2);

config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* static_resources = bootstrap.mutable_static_resources();
auto* listener = static_resources->mutable_listeners(0);
auto* filter_chain = listener->mutable_filter_chains(0);
Expand Down