Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ following information:
* The primaries for each shard.
* Nodes entering or leaving the cluster.

Envoy proxy supports identification of the nodes via both IP address and hostnames in the `cluster slots` command response. In case of failure to resolve a primary hostname, Envoy will retry resolution of all nodes periodically until success. Failure to resolve a replica simply skips that replica.

For topology configuration details, see the Redis Cluster
:ref:`v3 API reference <envoy_v3_api_msg_extensions.clusters.redis.v3.RedisClusterConfig>`.

Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ New Features
* http3: supports upstream HTTP/3 retries. Automatically retry `0-RTT safe requests <https://www.rfc-editor.org/rfc/rfc7231#section-4.2.1>`_ if they are rejected because they are sent `too early <https://datatracker.ietf.org/doc/html/rfc8470#section-5.2>`_. And automatically retry 0-RTT safe requests if connect attempt fails later on and the cluster is configured with TCP fallback. And add retry on ``http3-post-connect-failure`` policy which allows retry of failed HTTP/3 requests with TCP fallback even after handshake if the cluster is configured with TCP fallback. This feature is guarded by ``envoy.reloadable_features.conn_pool_new_stream_with_early_data_and_http3``.
* matching: the matching API can now express a match tree that will always match by omitting a matcher at the top level.
* outlier_detection: :ref:`max_ejection_time_jitter<envoy_v3_api_field_config.cluster.v3.OutlierDetection.base_ejection_time>` configuration added to allow adding a random value to the ejection time to prevent 'thundering herd' scenarios. Defaults to 0 so as to not break or change the behavior of existing deployments.
* redis: support for hostnames returned in `cluster slots` response is now available.
* schema_validator_tool: added ``bootstrap`` checking to the
:ref:`schema validator check tool <install_tools_schema_validator_check_tool>`. Also fixed linking
of all extensions into the tool so that all typed configurations can be properly verified.
Expand Down
238 changes: 205 additions & 33 deletions source/extensions/clusters/redis/redis_cluster.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "redis_cluster.h"

#include <cstdint>
#include <memory>

#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h"
#include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h"
Expand Down Expand Up @@ -90,7 +93,7 @@ void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added,
hosts_added, hosts_removed, absl::nullopt);
}

void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) {
void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots) {
Upstream::HostVector new_hosts;
absl::flat_hash_set<std::string> all_new_hosts;

Expand Down Expand Up @@ -222,27 +225,13 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession(
NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
parent_.info()->statsScope().symbolTable())) {}

// Convert the cluster slot IP/Port response to and address, return null if the response
// Convert the cluster slot IP/Port response to an address, return null if the response
// does not match the expected type.
Network::Address::InstanceConstSharedPtr
RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ProcessCluster(
const NetworkFilters::Common::Redis::RespValue& value) {
if (value.type() != NetworkFilters::Common::Redis::RespType::Array) {
return nullptr;
}
auto& array = value.asArray();

if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString ||
array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) {
return nullptr;
}

try {
return Network::Utility::parseInternetAddress(array[0].asString(), array[1].asInteger(), false);
} catch (const EnvoyException& ex) {
ENVOY_LOG(debug, "Invalid ip address in CLUSTER SLOTS response: {}", ex.what());
return nullptr;
}
RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ipAddressFromClusterEntry(
const std::vector<NetworkFilters::Common::Redis::RespValue>& array) {
return Network::Utility::parseInternetAddressNoThrow(array[0].asString(), array[1].asInteger(),
false);
}

RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() {
Expand Down Expand Up @@ -316,6 +305,124 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() {
current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this);
}

void RedisCluster::RedisDiscoverySession::updateDnsStats(
Network::DnsResolver::ResolutionStatus status, bool empty_response) {
if (status == Network::DnsResolver::ResolutionStatus::Failure) {
parent_.info_->stats().update_failure_.inc();
} else if (empty_response) {
parent_.info_->stats().update_empty_.inc();
}
}

/**
* Resolve the primary cluster entry hostname in each slot.
* If the primary is successfully resolved, we proceed to resolve replicas.
* We use the count of hostnames that require resolution to decide when the resolution process is
* completed, and then call the post-resolution hooks.
*
* If resolving any one of the primary replicas fails, we stop the resolution process and reset
* the timers to retry the resolution. Failure to resolve a replica, on the other hand does not
* stop the process. If we replica resolution fails, we simply log a warning, and move to resolving
* the rest.
*
* @param slots the list of slots which may need DNS resolution
* @param address_resolution_required_cnt the number of hostnames that need DNS resolution
*/
void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(
ClusterSlotsSharedPtr&& slots,
std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) {
auto& slot = (*slots)[slot_idx];
if (slot.primary() == nullptr) {
ENVOY_LOG(debug,
"starting async DNS resolution for primary slot address {} at index location {}",
slot.primary_hostname_, slot_idx);
parent_.dns_resolver_->resolve(
slot.primary_hostname_, parent_.dns_lookup_family_,
[this, slot_idx, slots,
&hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status,
std::list<Network::DnsResponse>&& response) -> void {
auto& slot = (*slots)[slot_idx];
ENVOY_LOG(debug, "async DNS resolution complete for {}", slot.primary_hostname_);
updateDnsStats(status, response.empty());
// If DNS resolution for a primary fails, we stop resolution for remaining, and reset
// the timer.
if (status != Network::DnsResolver::ResolutionStatus::Success) {
ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}",
slot.primary_hostname_);
resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
return;
}
// Primary slot address resolved
slot.setPrimary(Network::Utility::getAddressWithPort(
*response.front().addrInfo().address_, slot.primary_port_));
(*hostname_resolution_required_cnt)--;
// Continue on to resolve replicas
resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
});
} else {
resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
}
}
}

/**
* Resolve the replicas in a cluster entry. If there are no replicas, simply return.
* If all the hostnames have been resolved, call post-resolution methods.
* Failure to resolve a replica does not stop the overall resolution process. We log a
* warning, and move to the next one.
*
* @param slots the list of slots which may need DNS resolution
* @param index the specific index into `slots` whose replicas need to be resolved
* @param address_resolution_required_cnt the number of address that need to be resolved
*/
void RedisCluster::RedisDiscoverySession::resolveReplicas(
ClusterSlotsSharedPtr slots, std::size_t index,
std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
auto& slot = (*slots)[index];
if (slot.replicas_to_resolve_.empty()) {
if (*hostname_resolution_required_cnt == 0) {
finishClusterHostnameResolution(slots);
}
return;
}

for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) {
auto replica = slot.replicas_to_resolve_[replica_idx];
ENVOY_LOG(debug, "starting async DNS resolution for replica address {}", replica.first);
parent_.dns_resolver_->resolve(
replica.first, parent_.dns_lookup_family_,
[this, index, slots, replica_idx,
&hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status,
std::list<Network::DnsResponse>&& response) -> void {
auto& slot = (*slots)[index];
auto& replica = slot.replicas_to_resolve_[replica_idx];
ENVOY_LOG(debug, "async DNS resolution complete for {}", replica.first);
updateDnsStats(status, response.empty());
// If DNS resolution fails here, we move on to resolve other replicas in the list.
// We log a warn message.
if (status != Network::DnsResolver::ResolutionStatus::Success) {
ENVOY_LOG(warn, "Unable to resolve cluster replica address {}", replica.first);
} else {
// Replica resolved
slot.addReplica(Network::Utility::getAddressWithPort(
*response.front().addrInfo().address_, replica.second));
}
(*hostname_resolution_required_cnt)--;
// finish resolution if all the addresses have been resolved.
if (*hostname_resolution_required_cnt <= 0) {
finishClusterHostnameResolution(slots);
}
});
}
}

void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution(
ClusterSlotsSharedPtr slots) {
parent_.onClusterSlotUpdate(std::move(slots));
resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
}

void RedisCluster::RedisDiscoverySession::onResponse(
NetworkFilters::Common::Redis::RespValuePtr&& value) {
current_request_ = nullptr;
Expand All @@ -331,14 +438,31 @@ void RedisCluster::RedisDiscoverySession::onResponse(
return;
}

auto slots = std::make_unique<std::vector<ClusterSlot>>();

auto cluster_slots = std::make_shared<std::vector<ClusterSlot>>();

// https://redis.io/commands/cluster-slots
// CLUSTER SLOTS represents nested array of redis instances, like this:
//
// 1) 1) (integer) 0 <-- start slot range
// 2) (integer) 5460 <-- end slot range
//
// 3) 1) "127.0.0.1" <-- primary slot IP ADDR(HOSTNAME)
// 2) (integer) 30001 <-- primary slot PORT
// 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366"
//
// 4) 1) "127.0.0.2" <-- replica slot IP ADDR(HOSTNAME)
// 2) (integer) 30004 <-- replica slot PORT
// 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"
//
// Loop through the cluster slot response and error checks for each field.
auto hostname_resolution_required_cnt = std::make_shared<std::uint64_t>(0);
for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) {
if (part.type() != NetworkFilters::Common::Redis::RespType::Array) {
onUnexpectedResponse(value);
return;
}

// Row 1-2: Slot ranges
const std::vector<NetworkFilters::Common::Redis::RespValue>& slot_range = part.asArray();
if (slot_range.size() < 3 ||
slot_range[SlotRangeStart].type() !=
Expand All @@ -351,29 +475,77 @@ void RedisCluster::RedisDiscoverySession::onResponse(
return;
}

// Field 2: Primary address for slot range
auto primary_address = ProcessCluster(slot_range[SlotPrimary]);
if (!primary_address) {
// Row 3: Primary slot address
if (!validateCluster(slot_range[SlotPrimary])) {
onUnexpectedResponse(value);
return;
}
// Try to parse primary slot address as IP address
// It may fail in case the address is a hostname. If this is the case - we'll come back later
// and try to resolve hostnames asynchronously. For example, AWS ElastiCache returns hostname
// instead of IP address.
ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(),
ipAddressFromClusterEntry(slot_range[SlotPrimary].asArray()));
if (slot.primary() == nullptr) {
// Primary address is potentially a hostname, save it for async DNS resolution.
const auto& array = slot_range[SlotPrimary].asArray();
slot.primary_hostname_ = array[0].asString();
slot.primary_port_ = array[1].asInteger();
(*hostname_resolution_required_cnt)++;
}

slots->emplace_back(slot_range[SlotRangeStart].asInteger(),
slot_range[SlotRangeEnd].asInteger(), primary_address);

// Row 4-N: Replica(s) addresses
for (auto replica = std::next(slot_range.begin(), SlotReplicaStart);
replica != slot_range.end(); ++replica) {
auto replica_address = ProcessCluster(*replica);
if (!replica_address) {
if (!validateCluster(*replica)) {
onUnexpectedResponse(value);
return;
}
slots->back().addReplica(std::move(replica_address));
auto replica_address = ipAddressFromClusterEntry(replica->asArray());
if (replica_address) {
slot.addReplica(std::move(replica_address));
} else {
// Replica address is potentially a hostname, save it for async DNS resolution.
const auto& array = replica->asArray();
slot.addReplicaToResolve(array[0].asString(), array[1].asInteger());
(*hostname_resolution_required_cnt)++;
}
}
cluster_slots->push_back(std::move(slot));
}

parent_.onClusterSlotUpdate(std::move(slots));
resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
if (*hostname_resolution_required_cnt > 0) {
// DNS resolution is required, defer finalizing the slot update until resolution is complete.
resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt);
} else {
// All slots addresses were represented by IP/Port pairs.
parent_.onClusterSlotUpdate(std::move(cluster_slots));
resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
}
}

// Ensure that Slot Cluster response has valid format
bool RedisCluster::RedisDiscoverySession::validateCluster(
const NetworkFilters::Common::Redis::RespValue& value) {
// Verify data types
if (value.type() != NetworkFilters::Common::Redis::RespType::Array) {
return false;
}
const auto& array = value.asArray();
if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString ||
array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) {
return false;
}
// Verify IP/Host address
if (array[0].asString().empty()) {
return false;
}
// Verify port
if (array[1].asInteger() > 0xffff) {
return false;
}

return true;
}

void RedisCluster::RedisDiscoverySession::onUnexpectedResponse(
Expand Down
11 changes: 9 additions & 2 deletions source/extensions/clusters/redis/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
void updateAllHosts(const Upstream::HostVector& hosts_added,
const Upstream::HostVector& hosts_removed, uint32_t priority);

void onClusterSlotUpdate(ClusterSlotsPtr&&);
void onClusterSlotUpdate(ClusterSlotsSharedPtr&&);

void reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) override;

Expand Down Expand Up @@ -242,7 +242,14 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl {
void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&);

Network::Address::InstanceConstSharedPtr
ProcessCluster(const NetworkFilters::Common::Redis::RespValue& value);
ipAddressFromClusterEntry(const std::vector<NetworkFilters::Common::Redis::RespValue>& value);
bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value);
void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots,
std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt);
void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index,
std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt);
void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots);
void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response);

RedisCluster& parent_;
Event::Dispatcher& dispatcher_;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/clusters/redis/redis_cluster_lb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSl
}

// RedisClusterLoadBalancerFactory
bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsPtr&& slots,
bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots,
Envoy::Upstream::HostMap& all_hosts) {
// The slots is sorted, allowing for a quick comparison to make sure we need to update the slot
// array sort based on start and end to enable efficient comparison
Expand Down
Loading