diff --git a/docs/root/intro/arch_overview/other_protocols/redis.rst b/docs/root/intro/arch_overview/other_protocols/redis.rst index 0349a0d8b888c..944879300450d 100644 --- a/docs/root/intro/arch_overview/other_protocols/redis.rst +++ b/docs/root/intro/arch_overview/other_protocols/redis.rst @@ -81,6 +81,8 @@ following information: * The primaries for each shard. * Nodes entering or leaving the cluster. +Envoy proxy supports identification of the nodes via both IP address and hostnames in the `cluster slots` command response. In case of failure to resolve a primary hostname, Envoy will retry resolution of all nodes periodically until success. Failure to resolve a replica simply skips that replica. + For topology configuration details, see the Redis Cluster :ref:`v3 API reference `. diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 51ce30b61fbaa..877a1bfd8c0b8 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -77,6 +77,7 @@ New Features * http3: supports upstream HTTP/3 retries. Automatically retry `0-RTT safe requests `_ if they are rejected because they are sent `too early `_. And automatically retry 0-RTT safe requests if connect attempt fails later on and the cluster is configured with TCP fallback. And add retry on ``http3-post-connect-failure`` policy which allows retry of failed HTTP/3 requests with TCP fallback even after handshake if the cluster is configured with TCP fallback. This feature is guarded by ``envoy.reloadable_features.conn_pool_new_stream_with_early_data_and_http3``. * matching: the matching API can now express a match tree that will always match by omitting a matcher at the top level. * outlier_detection: :ref:`max_ejection_time_jitter` configuration added to allow adding a random value to the ejection time to prevent 'thundering herd' scenarios. Defaults to 0 so as to not break or change the behavior of existing deployments. +* redis: support for hostnames returned in `cluster slots` response is now available. * schema_validator_tool: added ``bootstrap`` checking to the :ref:`schema validator check tool `. Also fixed linking of all extensions into the tool so that all typed configurations can be properly verified. diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 9a64b58050a08..71ce6e69cd71b 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -1,5 +1,8 @@ #include "redis_cluster.h" +#include +#include + #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h" #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h" @@ -90,7 +93,7 @@ void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added, hosts_added, hosts_removed, absl::nullopt); } -void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) { +void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots) { Upstream::HostVector new_hosts; absl::flat_hash_set all_new_hosts; @@ -222,27 +225,13 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( parent_.info()->statsScope().symbolTable())) {} -// Convert the cluster slot IP/Port response to and address, return null if the response +// Convert the cluster slot IP/Port response to an address, return null if the response // does not match the expected type. Network::Address::InstanceConstSharedPtr -RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ProcessCluster( - const NetworkFilters::Common::Redis::RespValue& value) { - if (value.type() != NetworkFilters::Common::Redis::RespType::Array) { - return nullptr; - } - auto& array = value.asArray(); - - if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString || - array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) { - return nullptr; - } - - try { - return Network::Utility::parseInternetAddress(array[0].asString(), array[1].asInteger(), false); - } catch (const EnvoyException& ex) { - ENVOY_LOG(debug, "Invalid ip address in CLUSTER SLOTS response: {}", ex.what()); - return nullptr; - } +RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ipAddressFromClusterEntry( + const std::vector& array) { + return Network::Utility::parseInternetAddressNoThrow(array[0].asString(), array[1].asInteger(), + false); } RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() { @@ -316,6 +305,124 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); } +void RedisCluster::RedisDiscoverySession::updateDnsStats( + Network::DnsResolver::ResolutionStatus status, bool empty_response) { + if (status == Network::DnsResolver::ResolutionStatus::Failure) { + parent_.info_->stats().update_failure_.inc(); + } else if (empty_response) { + parent_.info_->stats().update_empty_.inc(); + } +} + +/** + * Resolve the primary cluster entry hostname in each slot. + * If the primary is successfully resolved, we proceed to resolve replicas. + * We use the count of hostnames that require resolution to decide when the resolution process is + * completed, and then call the post-resolution hooks. + * + * If resolving any one of the primary replicas fails, we stop the resolution process and reset + * the timers to retry the resolution. Failure to resolve a replica, on the other hand does not + * stop the process. If we replica resolution fails, we simply log a warning, and move to resolving + * the rest. + * + * @param slots the list of slots which may need DNS resolution + * @param address_resolution_required_cnt the number of hostnames that need DNS resolution + */ +void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( + ClusterSlotsSharedPtr&& slots, + std::shared_ptr hostname_resolution_required_cnt) { + for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) { + auto& slot = (*slots)[slot_idx]; + if (slot.primary() == nullptr) { + ENVOY_LOG(debug, + "starting async DNS resolution for primary slot address {} at index location {}", + slot.primary_hostname_, slot_idx); + parent_.dns_resolver_->resolve( + slot.primary_hostname_, parent_.dns_lookup_family_, + [this, slot_idx, slots, + &hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + auto& slot = (*slots)[slot_idx]; + ENVOY_LOG(debug, "async DNS resolution complete for {}", slot.primary_hostname_); + updateDnsStats(status, response.empty()); + // If DNS resolution for a primary fails, we stop resolution for remaining, and reset + // the timer. + if (status != Network::DnsResolver::ResolutionStatus::Success) { + ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}", + slot.primary_hostname_); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + return; + } + // Primary slot address resolved + slot.setPrimary(Network::Utility::getAddressWithPort( + *response.front().addrInfo().address_, slot.primary_port_)); + (*hostname_resolution_required_cnt)--; + // Continue on to resolve replicas + resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt); + }); + } else { + resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt); + } + } +} + +/** + * Resolve the replicas in a cluster entry. If there are no replicas, simply return. + * If all the hostnames have been resolved, call post-resolution methods. + * Failure to resolve a replica does not stop the overall resolution process. We log a + * warning, and move to the next one. + * + * @param slots the list of slots which may need DNS resolution + * @param index the specific index into `slots` whose replicas need to be resolved + * @param address_resolution_required_cnt the number of address that need to be resolved + */ +void RedisCluster::RedisDiscoverySession::resolveReplicas( + ClusterSlotsSharedPtr slots, std::size_t index, + std::shared_ptr hostname_resolution_required_cnt) { + auto& slot = (*slots)[index]; + if (slot.replicas_to_resolve_.empty()) { + if (*hostname_resolution_required_cnt == 0) { + finishClusterHostnameResolution(slots); + } + return; + } + + for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) { + auto replica = slot.replicas_to_resolve_[replica_idx]; + ENVOY_LOG(debug, "starting async DNS resolution for replica address {}", replica.first); + parent_.dns_resolver_->resolve( + replica.first, parent_.dns_lookup_family_, + [this, index, slots, replica_idx, + &hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + auto& slot = (*slots)[index]; + auto& replica = slot.replicas_to_resolve_[replica_idx]; + ENVOY_LOG(debug, "async DNS resolution complete for {}", replica.first); + updateDnsStats(status, response.empty()); + // If DNS resolution fails here, we move on to resolve other replicas in the list. + // We log a warn message. + if (status != Network::DnsResolver::ResolutionStatus::Success) { + ENVOY_LOG(warn, "Unable to resolve cluster replica address {}", replica.first); + } else { + // Replica resolved + slot.addReplica(Network::Utility::getAddressWithPort( + *response.front().addrInfo().address_, replica.second)); + } + (*hostname_resolution_required_cnt)--; + // finish resolution if all the addresses have been resolved. + if (*hostname_resolution_required_cnt <= 0) { + finishClusterHostnameResolution(slots); + } + }); + } +} + +void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution( + ClusterSlotsSharedPtr slots) { + parent_.onClusterSlotUpdate(std::move(slots)); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); +} + void RedisCluster::RedisDiscoverySession::onResponse( NetworkFilters::Common::Redis::RespValuePtr&& value) { current_request_ = nullptr; @@ -331,14 +438,31 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } - auto slots = std::make_unique>(); - + auto cluster_slots = std::make_shared>(); + + // https://redis.io/commands/cluster-slots + // CLUSTER SLOTS represents nested array of redis instances, like this: + // + // 1) 1) (integer) 0 <-- start slot range + // 2) (integer) 5460 <-- end slot range + // + // 3) 1) "127.0.0.1" <-- primary slot IP ADDR(HOSTNAME) + // 2) (integer) 30001 <-- primary slot PORT + // 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" + // + // 4) 1) "127.0.0.2" <-- replica slot IP ADDR(HOSTNAME) + // 2) (integer) 30004 <-- replica slot PORT + // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" + // // Loop through the cluster slot response and error checks for each field. + auto hostname_resolution_required_cnt = std::make_shared(0); for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) { if (part.type() != NetworkFilters::Common::Redis::RespType::Array) { onUnexpectedResponse(value); return; } + + // Row 1-2: Slot ranges const std::vector& slot_range = part.asArray(); if (slot_range.size() < 3 || slot_range[SlotRangeStart].type() != @@ -351,29 +475,77 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } - // Field 2: Primary address for slot range - auto primary_address = ProcessCluster(slot_range[SlotPrimary]); - if (!primary_address) { + // Row 3: Primary slot address + if (!validateCluster(slot_range[SlotPrimary])) { onUnexpectedResponse(value); return; } + // Try to parse primary slot address as IP address + // It may fail in case the address is a hostname. If this is the case - we'll come back later + // and try to resolve hostnames asynchronously. For example, AWS ElastiCache returns hostname + // instead of IP address. + ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(), + ipAddressFromClusterEntry(slot_range[SlotPrimary].asArray())); + if (slot.primary() == nullptr) { + // Primary address is potentially a hostname, save it for async DNS resolution. + const auto& array = slot_range[SlotPrimary].asArray(); + slot.primary_hostname_ = array[0].asString(); + slot.primary_port_ = array[1].asInteger(); + (*hostname_resolution_required_cnt)++; + } - slots->emplace_back(slot_range[SlotRangeStart].asInteger(), - slot_range[SlotRangeEnd].asInteger(), primary_address); - + // Row 4-N: Replica(s) addresses for (auto replica = std::next(slot_range.begin(), SlotReplicaStart); replica != slot_range.end(); ++replica) { - auto replica_address = ProcessCluster(*replica); - if (!replica_address) { + if (!validateCluster(*replica)) { onUnexpectedResponse(value); return; } - slots->back().addReplica(std::move(replica_address)); + auto replica_address = ipAddressFromClusterEntry(replica->asArray()); + if (replica_address) { + slot.addReplica(std::move(replica_address)); + } else { + // Replica address is potentially a hostname, save it for async DNS resolution. + const auto& array = replica->asArray(); + slot.addReplicaToResolve(array[0].asString(), array[1].asInteger()); + (*hostname_resolution_required_cnt)++; + } } + cluster_slots->push_back(std::move(slot)); } - parent_.onClusterSlotUpdate(std::move(slots)); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + if (*hostname_resolution_required_cnt > 0) { + // DNS resolution is required, defer finalizing the slot update until resolution is complete. + resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt); + } else { + // All slots addresses were represented by IP/Port pairs. + parent_.onClusterSlotUpdate(std::move(cluster_slots)); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + } +} + +// Ensure that Slot Cluster response has valid format +bool RedisCluster::RedisDiscoverySession::validateCluster( + const NetworkFilters::Common::Redis::RespValue& value) { + // Verify data types + if (value.type() != NetworkFilters::Common::Redis::RespType::Array) { + return false; + } + const auto& array = value.asArray(); + if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString || + array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) { + return false; + } + // Verify IP/Host address + if (array[0].asString().empty()) { + return false; + } + // Verify port + if (array[1].asInteger() > 0xffff) { + return false; + } + + return true; } void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index ae8553b2effa9..74d36679157a0 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -126,7 +126,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { void updateAllHosts(const Upstream::HostVector& hosts_added, const Upstream::HostVector& hosts_removed, uint32_t priority); - void onClusterSlotUpdate(ClusterSlotsPtr&&); + void onClusterSlotUpdate(ClusterSlotsSharedPtr&&); void reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) override; @@ -242,7 +242,14 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&); Network::Address::InstanceConstSharedPtr - ProcessCluster(const NetworkFilters::Common::Redis::RespValue& value); + ipAddressFromClusterEntry(const std::vector& value); + bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); + void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots, + std::shared_ptr hostname_resolution_required_cnt); + void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index, + std::shared_ptr hostname_resolution_required_cnt); + void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots); + void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); RedisCluster& parent_; Event::Dispatcher& dispatcher_; diff --git a/source/extensions/clusters/redis/redis_cluster_lb.cc b/source/extensions/clusters/redis/redis_cluster_lb.cc index 13e5af645efcc..c44126e794f72 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.cc +++ b/source/extensions/clusters/redis/redis_cluster_lb.cc @@ -17,7 +17,7 @@ bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSl } // RedisClusterLoadBalancerFactory -bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsPtr&& slots, +bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Envoy::Upstream::HostMap& all_hosts) { // The slots is sorted, allowing for a quick comparison to make sure we need to update the slot // array sort based on start and end to enable efficient comparison diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index a4eb2352df6c7..1a311db0dea1e 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -25,6 +25,8 @@ namespace Redis { static const uint64_t MaxSlot = 16384; +using ReplicaToResolve = std::pair; + class ClusterSlot { public: ClusterSlot(int64_t start, int64_t end, Network::Address::InstanceConstSharedPtr primary) @@ -36,12 +38,24 @@ class ClusterSlot { const absl::btree_map& replicas() const { return replicas_; } + + void setPrimary(Network::Address::InstanceConstSharedPtr address) { + primary_ = std::move(address); + } void addReplica(Network::Address::InstanceConstSharedPtr replica_address) { replicas_.emplace(replica_address->asString(), std::move(replica_address)); } + void addReplicaToResolve(const std::string& host, uint16_t port) { + replicas_to_resolve_.emplace_back(host, port); + } bool operator==(const ClusterSlot& rhs) const; + // In case of primary slot address is hostname and needs to be resolved + std::string primary_hostname_; + uint16_t primary_port_; + std::vector replicas_to_resolve_; + private: int64_t start_; int64_t end_; @@ -109,7 +123,8 @@ class ClusterSlotUpdateCallBack { * @param all_hosts provides the updated hosts. * @return indicate if the cluster slot is updated or not. */ - virtual bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap& all_hosts) PURE; + virtual bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, + Upstream::HostMap& all_hosts) PURE; /** * Callback when a host's health status is updated @@ -129,7 +144,7 @@ class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack, RedisClusterLoadBalancerFactory(Random::RandomGenerator& random) : random_(random) {} // ClusterSlotUpdateCallBack - bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap& all_hosts) override; + bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Upstream::HostMap& all_hosts) override; void onHostHealthUpdate() override; diff --git a/test/extensions/clusters/redis/mocks.h b/test/extensions/clusters/redis/mocks.h index 1f1531f45be6c..123e09f181a07 100644 --- a/test/extensions/clusters/redis/mocks.h +++ b/test/extensions/clusters/redis/mocks.h @@ -21,7 +21,7 @@ class MockClusterSlotUpdateCallBack : public ClusterSlotUpdateCallBack { MockClusterSlotUpdateCallBack(); ~MockClusterSlotUpdateCallBack() override = default; - MOCK_METHOD(bool, onClusterSlotUpdate, (ClusterSlotsPtr&&, Upstream::HostMap&)); + MOCK_METHOD(bool, onClusterSlotUpdate, (ClusterSlotsSharedPtr&&, Upstream::HostMap&)); MOCK_METHOD(void, onHostHealthUpdate, ()); }; diff --git a/test/extensions/clusters/redis/redis_cluster_integration_test.cc b/test/extensions/clusters/redis/redis_cluster_integration_test.cc index 7a0128d09f00d..b38e9d3c9d63c 100644 --- a/test/extensions/clusters/redis/redis_cluster_integration_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_integration_test.cc @@ -1,3 +1,4 @@ +#include #include #include @@ -157,6 +158,9 @@ class RedisClusterIntegrationTest : public testing::TestWithParammutable_clusters(0); + if (version_ == Network::Address::IpVersion::v4) { + cluster_0->set_dns_lookup_family(envoy::config::cluster::v3::Cluster::V4_ONLY); + } for (int j = 0; j < cluster_0->load_assignment().endpoints_size(); ++j) { auto locality_lb = cluster_0->mutable_load_assignment()->mutable_endpoints(j); for (int k = 0; k < locality_lb->lb_endpoints_size(); ++k) { @@ -319,6 +323,22 @@ class RedisClusterIntegrationTest : public testing::TestWithParamlocalAddress()->ip()->port(), "localhost", + fake_upstreams_[1]->localAddress()->ip()->port()); + + expectCallClusterSlot(random_index_, cluster_slot_response); + }; + + initialize(); + + // foo hashes to slot 12182 which is in upstream 0 + simpleRequestAndResponse(0, makeBulkStringArray({"get", "foo"}), "$3\r\nbar\r\n"); +} + +// This test sends a simple "get foo" command from a fake // This test sends a simple "get foo" command from a fake // downstream client through the proxy to a fake upstream // Redis cluster with 2 slots. The fake server sends a valid response diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index 8c1fc3cc9f352..f08a41b05eeab 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -193,6 +193,33 @@ class RedisClusterTest : public testing::Test, pool_callbacks_->onFailure(); } + NetworkFilters::Common::Redis::RespValuePtr singleSlotPrimary(const std::string& primary, + int64_t port) const { + std::vector primary_1(2); + primary_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + primary_1[0].asString() = primary; + primary_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + primary_1[1].asInteger() = port; + + std::vector slot_1(3); + slot_1[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[0].asInteger() = 0; + slot_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[1].asInteger() = 16383; + slot_1[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[2].asArray().swap(primary_1); + + std::vector slots(1); + slots[0].type(NetworkFilters::Common::Redis::RespType::Array); + slots[0].asArray().swap(slot_1); + + NetworkFilters::Common::Redis::RespValuePtr response( + new NetworkFilters::Common::Redis::RespValue()); + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots); + return response; + } + NetworkFilters::Common::Redis::RespValuePtr singleSlotPrimaryReplica(const std::string& primary, const std::string& replica, int64_t port) const { @@ -229,6 +256,50 @@ class RedisClusterTest : public testing::Test, return response; } + NetworkFilters::Common::Redis::RespValuePtr + singleSlotPrimaryWithTwoReplicas(const std::string& primary, const std::string& replica_1, + const std::string& replica_2, int64_t port) const { + std::vector primary_1(2); + primary_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + primary_1[0].asString() = primary; + primary_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + primary_1[1].asInteger() = port; + + std::vector repl_1(2); + repl_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + repl_1[0].asString() = replica_1; + repl_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + repl_1[1].asInteger() = port; + + std::vector repl_2(2); + repl_2[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + repl_2[0].asString() = replica_2; + repl_2[1].type(NetworkFilters::Common::Redis::RespType::Integer); + repl_2[1].asInteger() = port; + + std::vector slot_1(5); + slot_1[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[0].asInteger() = 0; + slot_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[1].asInteger() = 16383; + slot_1[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[2].asArray().swap(primary_1); + slot_1[3].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[3].asArray().swap(repl_1); + slot_1[4].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[4].asArray().swap(repl_2); + + std::vector slots(1); + slots[0].type(NetworkFilters::Common::Redis::RespType::Array); + slots[0].asArray().swap(slot_1); + + NetworkFilters::Common::Redis::RespValuePtr response( + new NetworkFilters::Common::Redis::RespValue()); + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots); + return response; + } + NetworkFilters::Common::Redis::RespValuePtr twoSlotsPrimaries() const { std::vector primary_1(2); primary_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); @@ -398,7 +469,7 @@ class RedisClusterTest : public testing::Test, if (flags.test(primary_ip_value)) { primary_1_array.push_back(createStringField(flags.test(primary_ip_type), "127.0.0.1")); } else { - primary_1_array.push_back(createStringField(flags.test(primary_ip_type), "bad ip foo")); + primary_1_array.push_back(createStringField(flags.test(primary_ip_type), "")); } // Port field. primary_1_array.push_back(createIntegerField(flags.test(primary_port_type), 22120)); @@ -411,8 +482,7 @@ class RedisClusterTest : public testing::Test, replica_1_array.push_back( createStringField(replica_flags.test(replica_ip_type), "127.0.0.2")); } else { - replica_1_array.push_back( - createStringField(replica_flags.test(replica_ip_type), "bad ip bar")); + replica_1_array.push_back(createStringField(replica_flags.test(replica_ip_type), "")); } // Port field. replica_1_array.push_back(createIntegerField(replica_flags.test(replica_port_type), 22120)); @@ -663,6 +733,131 @@ TEST_P(RedisDnsParamTest, ImmediateResolveDns) { expectHealthyHosts(std::get<3>(GetParam())); } +TEST_F(RedisClusterTest, AddressAsHostname) { + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + const std::list primary_resolved_addresses{"127.0.1.1"}; + const std::list replica_resolved_addresses{"127.0.1.2"}; + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + replica_resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + // 1. Single slot with primary and replica + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); + EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); + + // 2. Single slot with just the primary hostname + expectRedisResolve(true); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->invokeCallback(); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimary("primary.com", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120"})); + EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); + + // 2. Single slot with just the primary IP address and replica hostname + expectRedisResolve(); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->invokeCallback(); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + replica_resolved_addresses); + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryReplica("127.0.1.1", "replica.org", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); + EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); +} + +TEST_F(RedisClusterTest, AddressAsHostnameFailure) { + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + const std::list primary_resolved_addresses{"127.0.1.1"}; + const std::list replica_resolved_addresses{"127.0.1.2"}; + + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + + // 1. Primary resolution is successful, but replica fails. + // Expect cluster slot update to be successful, with just one healthy host, and failure counter to + // be updated. + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + replica_resolved_addresses, + Network::DnsResolver::ResolutionStatus::Failure); + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + expectHealthyHosts(std::list({"127.0.1.1:22120"})); + EXPECT_EQ(1UL, cluster_->info()->stats().update_failure_.value()); + + // 2. Primary resolution fails, so replica resolution is not even called. + // Expect cluster slot update to be successful, with just one healthy host, and failure counter to + // be updated. + expectRedisResolve(true); + resolve_timer_->invokeCallback(); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses, + Network::DnsResolver::ResolutionStatus::Failure); + // NOTE: Intentionally commented out. Replica DNS resolution should even reach. It's here for + // illustrative purposes. + + // expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + // replica_resolved_addresses); + expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); + // healthy hosts is same as before, but failure count increases by 1 + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(2UL, cluster_->info()->stats().update_failure_.value()); +} + +TEST_F(RedisClusterTest, AddressAsHostnamePartialReplicaFailure) { + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + const std::list primary_resolved_addresses{"127.0.1.1"}; + + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + + // 1. Primary resolution is successful, and one of the replica is successful, but the other fails. + // Expect cluster slot update to be successful, with two healthy hosts, and expect failure counter + // to be updated. + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "failed-replica.org", + std::list{"127.0.1.2"}, + Network::DnsResolver::ResolutionStatus::Failure); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "success-replica.org", + std::list{"127.0.1.3"}); + + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryWithTwoReplicas("primary.com", "failed-replica.org", + "success-replica.org", 22120)); + EXPECT_EQ(2UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.3:22120"})); +} + TEST_F(RedisClusterTest, EmptyDnsResponse) { Event::MockTimer* dns_timer = new NiceMock(&dispatcher_); setupFromV3Yaml(BasicConfig); diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index d154b593c1478..3cd5e6b9fe15c 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -1333,3 +1333,4 @@ transid routable vhosts infos +ElastiCache