From fc8bdffe34fadda4085c4719c25c5be84fb93944 Mon Sep 17 00:00:00 2001 From: Konstantin Belyalov Date: Mon, 29 Nov 2021 22:15:33 -0700 Subject: [PATCH 01/21] WIP: adding support for redis cluster dns resolving Signed-off-by: Konstantin Belyalov Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 176 +++++++++++++++--- .../extensions/clusters/redis/redis_cluster.h | 5 +- .../clusters/redis/redis_cluster_lb.h | 13 ++ .../clusters/redis/redis_cluster_test.cc | 26 ++- 4 files changed, 193 insertions(+), 27 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 9a64b58050a08..5156a0834cc35 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -225,22 +225,14 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( // Convert the cluster slot IP/Port response to and address, return null if the response // does not match the expected type. Network::Address::InstanceConstSharedPtr -RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ProcessCluster( +RedisCluster::RedisDiscoverySession::RedisDiscoverySession::processClusterByIP( const NetworkFilters::Common::Redis::RespValue& value) { - if (value.type() != NetworkFilters::Common::Redis::RespType::Array) { - return nullptr; - } - auto& array = value.asArray(); - - if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString || - array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) { - return nullptr; - } + const auto& array = value.asArray(); try { return Network::Utility::parseInternetAddress(array[0].asString(), array[1].asInteger(), false); } catch (const EnvoyException& ex) { - ENVOY_LOG(debug, "Invalid ip address in CLUSTER SLOTS response: {}", ex.what()); + // Probably elasticache use case: hostname instead of IP return nullptr; } } @@ -316,6 +308,78 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); } +void RedisCluster::RedisDiscoverySession::updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool emptyResponse) +{ + if (status == Network::DnsResolver::ResolutionStatus::Failure || emptyResponse) { + if (status == Network::DnsResolver::ResolutionStatus::Failure) { + parent_.info_->stats().update_failure_.inc(); + } else { + parent_.info_->stats().update_empty_.inc(); + } + } +} + + +void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) +{ + // Iterate over all slots replicate and resolve all missing addresses one at a time + for (ClusterSlot& slot : *slots) { + // Resolve primary + if (slot.primary() == nullptr) { + ENVOY_LOG(trace, "starting async DNS resolution for primary slot address {}", slot.primary_hostname_); + parent_.dns_resolver_->resolve(slot.primary_hostname_, parent_.dns_lookup_family_, + [this, &slot, &slots](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); + updateDnsStats(status, response.empty()); + if (status != Network::DnsResolver::ResolutionStatus::Success) { + // Failed + ENVOY_LOG(debug, "Unable to resolve cluster slot primary address {}", slot.primary_hostname_); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + return; + } + // Primary slot address resolved + slot.setPrimary(Network::Utility::getAddressWithPort(*response.front().address_, slot.primary_port_)); + // Continue resolving slot's addresses until everything is resolved + resolveClusterHostnames(std::move(slots)); + } + ); + // do one resolution at a time: once resolved, callback will invoke this function again + return; + } + // Resolve all replicas of the slot, one replica at a time + if (!slot.replicas_to_resolve_.empty()) { + const std::string& host = slot.replicas_to_resolve_.back().first; + uint16_t port = slot.replicas_to_resolve_.back().second; + slot.replicas_to_resolve_.pop_back(); + ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", host); + parent_.dns_resolver_->resolve(host, parent_.dns_lookup_family_, + [this, &slot, &slots, port, host](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + ENVOY_LOG(trace, "async DNS resolution complete for {}", host); + updateDnsStats(status, response.empty()); + if (status != Network::DnsResolver::ResolutionStatus::Success) { + // Failed + ENVOY_LOG(debug, "Unable to resolve cluster replica address {}", host); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + return; + } + // Replica resolved + slot.addReplica(Network::Utility::getAddressWithPort(*response.front().address_, port)); + // Continue resolving slot's addresses until everything is resolved + resolveClusterHostnames(std::move(slots)); + } + ); + // do one resolution at a time: once resolved, callback will invoke this function again + return; + } + } // of for(clusters slots) + + // All slots addresses were represented by DNS hostname lookup. + parent_.onClusterSlotUpdate(std::move(slots)); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); +} + void RedisCluster::RedisDiscoverySession::onResponse( NetworkFilters::Common::Redis::RespValuePtr&& value) { current_request_ = nullptr; @@ -331,14 +395,30 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } - auto slots = std::make_unique>(); - + auto cluster_slots = std::make_unique>(); + + // CLUSTER SLOTS represents nested array of redis instances, like this: + // + // 1) 1) (integer) 0 <-- start slot range + // 2) (integer) 5460 <-- end slot range + // + // 3) 1) "127.0.0.1" | + // 2) (integer) 30001 <- master for slot as IP(HOST) / Port / ID + // 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" | + // + // 4) 1) "127.0.0.1" | + // 2) (integer) 30004 <- replicas in the same format as master + // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" | + // // Loop through the cluster slot response and error checks for each field. + bool address_resolve_required = false; for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) { if (part.type() != NetworkFilters::Common::Redis::RespType::Array) { onUnexpectedResponse(value); return; } + + // Row 1-2: Slot ranges const std::vector& slot_range = part.asArray(); if (slot_range.size() < 3 || slot_range[SlotRangeStart].type() != @@ -351,29 +431,77 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } - // Field 2: Primary address for slot range - auto primary_address = ProcessCluster(slot_range[SlotPrimary]); - if (!primary_address) { + // Row 3: Primary slot address + if (!validateCluster(slot_range[SlotPrimary])) { onUnexpectedResponse(value); return; } + // Try to parse primary slot address as IP address + // It may fail in AWS elasticache use case: it uses hostnames instead of IPs. + // If this is the case - we'll come back later and try to resolve hostnames asynchronously. + ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(), processClusterByIP(slot_range[SlotPrimary])); + if (slot.primary() == nullptr) { + // Primary address is hostname: save the name for further resolving + const auto& array = slot_range[SlotPrimary].asArray(); + slot.primary_hostname_ = array[0].asString(); + slot.primary_port_ = array[1].asInteger(); + } - slots->emplace_back(slot_range[SlotRangeStart].asInteger(), - slot_range[SlotRangeEnd].asInteger(), primary_address); - + // Row 4-N: Replica(s) addresses for (auto replica = std::next(slot_range.begin(), SlotReplicaStart); replica != slot_range.end(); ++replica) { - auto replica_address = ProcessCluster(*replica); - if (!replica_address) { + if (!validateCluster(*replica)) { onUnexpectedResponse(value); return; } - slots->back().addReplica(std::move(replica_address)); + auto replica_address = processClusterByIP(*replica); + if (replica_address) { + slot.addReplica(std::move(replica_address)); + } else { + // Possible AWS elasticache use case: hostname instead of IP + const auto& array = replica->asArray(); + slot.addReplicaToResolve(array[0].asString(), array[1].asInteger()); + } + } + // If at least one (primary, replicas) address is hostname, schedule DNS resolving + if (slot.primary() == nullptr || !slot.replicas_to_resolve_.empty()) { + address_resolve_required = true; } + cluster_slots->push_back(std::move(slot)); } - parent_.onClusterSlotUpdate(std::move(slots)); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + if (!address_resolve_required) { + // All slots addresses were represented by IP/Port pairs. + parent_.onClusterSlotUpdate(std::move(cluster_slots)); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + } else { + // Resolve hostnames, once completed run onClusterSlotUpdate() / enable timer + resolveClusterHostnames(std::move(cluster_slots)); + } +} + +// Ensure that Slot Cluster response has valid format +bool RedisCluster::RedisDiscoverySession::validateCluster(const NetworkFilters::Common::Redis::RespValue& value) +{ + // Verify data types + if (value.type() != NetworkFilters::Common::Redis::RespType::Array) { + return false; + } + const auto& array = value.asArray(); + if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString || + array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) { + return false; + } + // Verify IP/Host address + if (array[0].asString().empty()) { + return false; + } + // Verify port + if (array[1].asInteger() > 0xffff) { + return false; + } + + return true; } void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index ae8553b2effa9..d60ee2b6ef828 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -242,7 +242,10 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&); Network::Address::InstanceConstSharedPtr - ProcessCluster(const NetworkFilters::Common::Redis::RespValue& value); + processClusterByIP(const NetworkFilters::Common::Redis::RespValue& value); + bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); + void resolveClusterHostnames(ClusterSlotsPtr&& slots); + void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool emptyResponse); RedisCluster& parent_; Event::Dispatcher& dispatcher_; diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index a4eb2352df6c7..cb685d4eaffd6 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -25,6 +25,8 @@ namespace Redis { static const uint64_t MaxSlot = 16384; +using ReplicaToResolve = std::pair; + class ClusterSlot { public: ClusterSlot(int64_t start, int64_t end, Network::Address::InstanceConstSharedPtr primary) @@ -36,12 +38,23 @@ class ClusterSlot { const absl::btree_map& replicas() const { return replicas_; } + + void setPrimary(Network::Address::InstanceConstSharedPtr address) { + primary_ = std::move(address); + } void addReplica(Network::Address::InstanceConstSharedPtr replica_address) { replicas_.emplace(replica_address->asString(), std::move(replica_address)); } + void addReplicaToResolve(const std::string& host, uint16_t port) { + replicas_to_resolve_.emplace_back(host, port); + } bool operator==(const ClusterSlot& rhs) const; + // In case of primary slot address is hostname and needs to be resolved + std::string primary_hostname_; + uint16_t primary_port_; + std::vector replicas_to_resolve_; private: int64_t start_; int64_t end_; diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index 8c1fc3cc9f352..9d0ec549b9021 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -398,7 +398,7 @@ class RedisClusterTest : public testing::Test, if (flags.test(primary_ip_value)) { primary_1_array.push_back(createStringField(flags.test(primary_ip_type), "127.0.0.1")); } else { - primary_1_array.push_back(createStringField(flags.test(primary_ip_type), "bad ip foo")); + primary_1_array.push_back(createStringField(flags.test(primary_ip_type), "")); } // Port field. primary_1_array.push_back(createIntegerField(flags.test(primary_port_type), 22120)); @@ -412,7 +412,7 @@ class RedisClusterTest : public testing::Test, createStringField(replica_flags.test(replica_ip_type), "127.0.0.2")); } else { replica_1_array.push_back( - createStringField(replica_flags.test(replica_ip_type), "bad ip bar")); + createStringField(replica_flags.test(replica_ip_type), "")); } // Port field. replica_1_array.push_back(createIntegerField(replica_flags.test(replica_port_type), 22120)); @@ -663,6 +663,28 @@ TEST_P(RedisDnsParamTest, ImmediateResolveDns) { expectHealthyHosts(std::get<3>(GetParam())); } +TEST_F(RedisClusterTest, PrimaryAddressAsHostname) { + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + const std::list primary_resolved_addresses{"127.0.1.1"}; + const std::list replica_resolved_addresses{"127.0.1.2"}; + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", + resolved_addresses); + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + replica_resolved_addresses); + expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); +} + TEST_F(RedisClusterTest, EmptyDnsResponse) { Event::MockTimer* dns_timer = new NiceMock(&dispatcher_); setupFromV3Yaml(BasicConfig); From b0d77c56867fabc3abd87cf7918b0f673a995732 Mon Sep 17 00:00:00 2001 From: Konstantin Belyalov Date: Sun, 5 Dec 2021 14:21:15 -0700 Subject: [PATCH 02/21] Format code Signed-off-by: Konstantin Belyalov Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 44 ++++++++++--------- .../clusters/redis/redis_cluster_lb.h | 1 + .../clusters/redis/redis_cluster_test.cc | 10 ++--- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 5156a0834cc35..15a6a1f12e584 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -308,8 +308,8 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); } -void RedisCluster::RedisDiscoverySession::updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool emptyResponse) -{ +void RedisCluster::RedisDiscoverySession::updateDnsStats( + Network::DnsResolver::ResolutionStatus status, bool emptyResponse) { if (status == Network::DnsResolver::ResolutionStatus::Failure || emptyResponse) { if (status == Network::DnsResolver::ResolutionStatus::Failure) { parent_.info_->stats().update_failure_.inc(); @@ -319,31 +319,32 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats(Network::DnsResolver::R } } - -void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) -{ +void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) { // Iterate over all slots replicate and resolve all missing addresses one at a time for (ClusterSlot& slot : *slots) { // Resolve primary if (slot.primary() == nullptr) { - ENVOY_LOG(trace, "starting async DNS resolution for primary slot address {}", slot.primary_hostname_); - parent_.dns_resolver_->resolve(slot.primary_hostname_, parent_.dns_lookup_family_, + ENVOY_LOG(trace, "starting async DNS resolution for primary slot address {}", + slot.primary_hostname_); + parent_.dns_resolver_->resolve( + slot.primary_hostname_, parent_.dns_lookup_family_, [this, &slot, &slots](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { + std::list&& response) -> void { ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); updateDnsStats(status, response.empty()); if (status != Network::DnsResolver::ResolutionStatus::Success) { // Failed - ENVOY_LOG(debug, "Unable to resolve cluster slot primary address {}", slot.primary_hostname_); + ENVOY_LOG(debug, "Unable to resolve cluster slot primary address {}", + slot.primary_hostname_); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); return; } // Primary slot address resolved - slot.setPrimary(Network::Utility::getAddressWithPort(*response.front().address_, slot.primary_port_)); + slot.setPrimary(Network::Utility::getAddressWithPort(*response.front().address_, + slot.primary_port_)); // Continue resolving slot's addresses until everything is resolved resolveClusterHostnames(std::move(slots)); - } - ); + }); // do one resolution at a time: once resolved, callback will invoke this function again return; } @@ -353,9 +354,10 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt uint16_t port = slot.replicas_to_resolve_.back().second; slot.replicas_to_resolve_.pop_back(); ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", host); - parent_.dns_resolver_->resolve(host, parent_.dns_lookup_family_, + parent_.dns_resolver_->resolve( + host, parent_.dns_lookup_family_, [this, &slot, &slots, port, host](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { + std::list&& response) -> void { ENVOY_LOG(trace, "async DNS resolution complete for {}", host); updateDnsStats(status, response.empty()); if (status != Network::DnsResolver::ResolutionStatus::Success) { @@ -368,8 +370,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt slot.addReplica(Network::Utility::getAddressWithPort(*response.front().address_, port)); // Continue resolving slot's addresses until everything is resolved resolveClusterHostnames(std::move(slots)); - } - ); + }); // do one resolution at a time: once resolved, callback will invoke this function again return; } @@ -403,8 +404,8 @@ void RedisCluster::RedisDiscoverySession::onResponse( // 2) (integer) 5460 <-- end slot range // // 3) 1) "127.0.0.1" | - // 2) (integer) 30001 <- master for slot as IP(HOST) / Port / ID - // 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" | + // 2) (integer) 30001 <- master for slot as IP(HOST) / Port / + // ID 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" | // // 4) 1) "127.0.0.1" | // 2) (integer) 30004 <- replicas in the same format as master @@ -439,7 +440,8 @@ void RedisCluster::RedisDiscoverySession::onResponse( // Try to parse primary slot address as IP address // It may fail in AWS elasticache use case: it uses hostnames instead of IPs. // If this is the case - we'll come back later and try to resolve hostnames asynchronously. - ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(), processClusterByIP(slot_range[SlotPrimary])); + ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(), + processClusterByIP(slot_range[SlotPrimary])); if (slot.primary() == nullptr) { // Primary address is hostname: save the name for further resolving const auto& array = slot_range[SlotPrimary].asArray(); @@ -481,8 +483,8 @@ void RedisCluster::RedisDiscoverySession::onResponse( } // Ensure that Slot Cluster response has valid format -bool RedisCluster::RedisDiscoverySession::validateCluster(const NetworkFilters::Common::Redis::RespValue& value) -{ +bool RedisCluster::RedisDiscoverySession::validateCluster( + const NetworkFilters::Common::Redis::RespValue& value) { // Verify data types if (value.type() != NetworkFilters::Common::Redis::RespType::Array) { return false; diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index cb685d4eaffd6..756f75d68befc 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -55,6 +55,7 @@ class ClusterSlot { std::string primary_hostname_; uint16_t primary_port_; std::vector replicas_to_resolve_; + private: int64_t start_; int64_t end_; diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index 9d0ec549b9021..c8eebfae989a0 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -411,8 +411,7 @@ class RedisClusterTest : public testing::Test, replica_1_array.push_back( createStringField(replica_flags.test(replica_ip_type), "127.0.0.2")); } else { - replica_1_array.push_back( - createStringField(replica_flags.test(replica_ip_type), "")); + replica_1_array.push_back(createStringField(replica_flags.test(replica_ip_type), "")); } // Port field. replica_1_array.push_back(createIntegerField(replica_flags.test(replica_port_type), 22120)); @@ -668,8 +667,7 @@ TEST_F(RedisClusterTest, PrimaryAddressAsHostname) { const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; const std::list primary_resolved_addresses{"127.0.1.1"}; const std::list replica_resolved_addresses{"127.0.1.2"}; - expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", - resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); expectRedisResolve(true); EXPECT_CALL(membership_updated_, ready()); @@ -678,9 +676,9 @@ TEST_F(RedisClusterTest, PrimaryAddressAsHostname) { EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", - primary_resolved_addresses); + primary_resolved_addresses); expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", - replica_resolved_addresses); + replica_resolved_addresses); expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); } From f8720ee241db5e1953090183a0f14d363c954e4f Mon Sep 17 00:00:00 2001 From: Konstantin Belyalov Date: Sun, 5 Dec 2021 17:32:57 -0700 Subject: [PATCH 03/21] Add ElastiCache into spellcheck Signed-off-by: Konstantin Belyalov Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 6 +++--- tools/spelling/spelling_dictionary.txt | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 15a6a1f12e584..d03c043d1a14f 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -232,7 +232,7 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession::processClusterByIP( try { return Network::Utility::parseInternetAddress(array[0].asString(), array[1].asInteger(), false); } catch (const EnvoyException& ex) { - // Probably elasticache use case: hostname instead of IP + // Probably ElastiCache use case: hostname instead of IP return nullptr; } } @@ -438,7 +438,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } // Try to parse primary slot address as IP address - // It may fail in AWS elasticache use case: it uses hostnames instead of IPs. + // It may fail in AWS ElastiCache use case: it uses hostnames instead of IPs. // If this is the case - we'll come back later and try to resolve hostnames asynchronously. ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(), processClusterByIP(slot_range[SlotPrimary])); @@ -460,7 +460,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( if (replica_address) { slot.addReplica(std::move(replica_address)); } else { - // Possible AWS elasticache use case: hostname instead of IP + // Possible AWS ElastiCache use case: hostname instead of IP const auto& array = replica->asArray(); slot.addReplicaToResolve(array[0].asString(), array[1].asInteger()); } diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index d32fe15384b33..6846fccacb5c6 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -1323,3 +1323,4 @@ suri transid routable vhosts +ElastiCache From 8fc72be2a34f41921925a1c0cb2fccd4a3d2d248 Mon Sep 17 00:00:00 2001 From: Konstantin Belyalov Date: Sun, 5 Dec 2021 21:02:37 -0700 Subject: [PATCH 04/21] Fix style Signed-off-by: Konstantin Belyalov Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 4 ++-- source/extensions/clusters/redis/redis_cluster.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index d03c043d1a14f..bd717ff62aea1 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -309,8 +309,8 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { } void RedisCluster::RedisDiscoverySession::updateDnsStats( - Network::DnsResolver::ResolutionStatus status, bool emptyResponse) { - if (status == Network::DnsResolver::ResolutionStatus::Failure || emptyResponse) { + Network::DnsResolver::ResolutionStatus status, bool empty_response) { + if (status == Network::DnsResolver::ResolutionStatus::Failure || empty_response) { if (status == Network::DnsResolver::ResolutionStatus::Failure) { parent_.info_->stats().update_failure_.inc(); } else { diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index d60ee2b6ef828..654a1e535dcd3 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -245,7 +245,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { processClusterByIP(const NetworkFilters::Common::Redis::RespValue& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); void resolveClusterHostnames(ClusterSlotsPtr&& slots); - void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool emptyResponse); + void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); RedisCluster& parent_; Event::Dispatcher& dispatcher_; From b9d8d233bf88a049c4f84363739d9b8f95df0e42 Mon Sep 17 00:00:00 2001 From: Konstantin Belyalov Date: Tue, 7 Dec 2021 07:43:45 -0700 Subject: [PATCH 05/21] copy element from replicas to resolve vector Signed-off-by: Konstantin Belyalov Signed-off-by: Sai Teja Duthuluri --- .../extensions/clusters/redis/redis_cluster.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index bd717ff62aea1..27a905c235da4 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -350,24 +350,24 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt } // Resolve all replicas of the slot, one replica at a time if (!slot.replicas_to_resolve_.empty()) { - const std::string& host = slot.replicas_to_resolve_.back().first; - uint16_t port = slot.replicas_to_resolve_.back().second; + const auto replica = slot.replicas_to_resolve_.back(); slot.replicas_to_resolve_.pop_back(); - ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", host); + ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", replica.first); parent_.dns_resolver_->resolve( - host, parent_.dns_lookup_family_, - [this, &slot, &slots, port, host](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { - ENVOY_LOG(trace, "async DNS resolution complete for {}", host); + replica.first, parent_.dns_lookup_family_, + [this, &slot, &slots, &replica](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); updateDnsStats(status, response.empty()); if (status != Network::DnsResolver::ResolutionStatus::Success) { // Failed - ENVOY_LOG(debug, "Unable to resolve cluster replica address {}", host); + ENVOY_LOG(debug, "Unable to resolve cluster replica address {}", replica.first); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); return; } // Replica resolved - slot.addReplica(Network::Utility::getAddressWithPort(*response.front().address_, port)); + slot.addReplica( + Network::Utility::getAddressWithPort(*response.front().address_, replica.second)); // Continue resolving slot's addresses until everything is resolved resolveClusterHostnames(std::move(slots)); }); From d08a215b4c27a4fc86300b51e7ea09a21117f867 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Wed, 12 Jan 2022 23:38:53 -0800 Subject: [PATCH 06/21] - fix recursion error - refactor code to make more readable, and simplify recursion logic - do not fail entire resolution if a replica resolution fails - add test, fix existing one Co-authored-by: Konstantin Belyalov Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 127 ++++++++++-------- .../extensions/clusters/redis/redis_cluster.h | 3 + .../clusters/redis/redis_cluster_test.cc | 52 ++++++- 3 files changed, 125 insertions(+), 57 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 27a905c235da4..8b8f3aa0684da 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -5,6 +5,7 @@ #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h" #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h" +#include namespace Envoy { namespace Extensions { @@ -320,64 +321,82 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( } void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) { - // Iterate over all slots replicate and resolve all missing addresses one at a time - for (ClusterSlot& slot : *slots) { - // Resolve primary - if (slot.primary() == nullptr) { - ENVOY_LOG(trace, "starting async DNS resolution for primary slot address {}", - slot.primary_hostname_); - parent_.dns_resolver_->resolve( - slot.primary_hostname_, parent_.dns_lookup_family_, - [this, &slot, &slots](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { - ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); - updateDnsStats(status, response.empty()); - if (status != Network::DnsResolver::ResolutionStatus::Success) { - // Failed - ENVOY_LOG(debug, "Unable to resolve cluster slot primary address {}", - slot.primary_hostname_); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); - return; - } - // Primary slot address resolved - slot.setPrimary(Network::Utility::getAddressWithPort(*response.front().address_, - slot.primary_port_)); - // Continue resolving slot's addresses until everything is resolved - resolveClusterHostnames(std::move(slots)); - }); - // do one resolution at a time: once resolved, callback will invoke this function again - return; - } - // Resolve all replicas of the slot, one replica at a time - if (!slot.replicas_to_resolve_.empty()) { - const auto replica = slot.replicas_to_resolve_.back(); - slot.replicas_to_resolve_.pop_back(); - ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", replica.first); - parent_.dns_resolver_->resolve( - replica.first, parent_.dns_lookup_family_, - [this, &slot, &slots, &replica](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { - ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); - updateDnsStats(status, response.empty()); - if (status != Network::DnsResolver::ResolutionStatus::Success) { - // Failed - ENVOY_LOG(debug, "Unable to resolve cluster replica address {}", replica.first); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); - return; - } + std::shared_ptr s = std::make_shared(std::move(slots)); + resolveHostname(s, 0); +} + +void RedisCluster::RedisDiscoverySession::resolveHostname(std::shared_ptr slots, + std::size_t index) { + if (index >= (**slots).size()) { + finishClusterHostnameResolution(std::move(slots)); + return; + } + auto& slot = (**slots)[index]; + if (slot.primary() == nullptr) { + ENVOY_LOG(trace, + "starting async DNS resolution for primary slot address {} at index location {}", + slot.primary_hostname_, index); + parent_.dns_resolver_->resolve( + slot.primary_hostname_, parent_.dns_lookup_family_, + [this, index, + slots = std::move(slots)](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + auto& slot = (**slots)[index]; + ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); + updateDnsStats(status, response.empty()); + // If DNS resolution for a primary fails, we stop resolution for remaining, and reset the timer. + if (status != Network::DnsResolver::ResolutionStatus::Success) { + ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}", + slot.primary_hostname_); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + return; + } + // Primary slot address resolved + slot.setPrimary( + Network::Utility::getAddressWithPort(*response.front().address_, slot.primary_port_)); + // Continue on to resolve replicas + resolveReplicas(std::move(slots), index); + }); + } else { + resolveReplicas(std::move(slots), index); + } +} + +void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr slots, + std::size_t index) { + auto& slot = (**slots)[index]; + if (!slot.replicas_to_resolve_.empty()) { + const auto replica = slot.replicas_to_resolve_.back(); + slot.replicas_to_resolve_.pop_back(); + ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", replica.first); + parent_.dns_resolver_->resolve( + replica.first, parent_.dns_lookup_family_, + [this, index, slots = std::move(slots), + replica](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + auto& slot = (**slots)[index]; + ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); + updateDnsStats(status, response.empty()); + // If DNS resolution fails here, we move on to resolve other replicas in the list. + // We log a warn message. + if (status != Network::DnsResolver::ResolutionStatus::Success) { + ENVOY_LOG(warn, "Unable to resolve cluster replica address {}", replica.first); + } else { // Replica resolved slot.addReplica( Network::Utility::getAddressWithPort(*response.front().address_, replica.second)); - // Continue resolving slot's addresses until everything is resolved - resolveClusterHostnames(std::move(slots)); - }); - // do one resolution at a time: once resolved, callback will invoke this function again - return; - } - } // of for(clusters slots) + } + // We go back to same index as there may be more replicas + resolveReplicas(std::move(slots), index); + }); + } else { + resolveHostname(std::move(slots), index + 1); + } +} - // All slots addresses were represented by DNS hostname lookup. - parent_.onClusterSlotUpdate(std::move(slots)); +void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution( + std::shared_ptr slots) { + parent_.onClusterSlotUpdate(std::move(*slots)); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); } diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 654a1e535dcd3..34ede7b169bed 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -245,6 +245,9 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { processClusterByIP(const NetworkFilters::Common::Redis::RespValue& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); void resolveClusterHostnames(ClusterSlotsPtr&& slots); + void resolveHostname(std::shared_ptr slots, std::size_t index); + void resolveReplicas(std::shared_ptr slots, std::size_t index); + void finishClusterHostnameResolution(std::shared_ptr slots); void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); RedisCluster& parent_; diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index c8eebfae989a0..b7fa75a3ec7aa 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -662,12 +662,16 @@ TEST_P(RedisDnsParamTest, ImmediateResolveDns) { expectHealthyHosts(std::get<3>(GetParam())); } -TEST_F(RedisClusterTest, PrimaryAddressAsHostname) { +TEST_F(RedisClusterTest, AddressAsHostname) { setupFromV3Yaml(BasicConfig); const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; const std::list primary_resolved_addresses{"127.0.1.1"}; const std::list replica_resolved_addresses{"127.0.1.2"}; expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + replica_resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); expectRedisResolve(true); EXPECT_CALL(membership_updated_, ready()); @@ -675,12 +679,54 @@ TEST_F(RedisClusterTest, PrimaryAddressAsHostname) { cluster_->initialize([&]() -> void { initialized_.ready(); }); EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); + EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); +} + +TEST_F(RedisClusterTest, AddressAsHostnameFailure) { + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + const std::list primary_resolved_addresses{"127.0.1.1"}; + const std::list replica_resolved_addresses{"127.0.1.2"}; + + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + + // 1. Primary resolution is successful, but replica fails. + // Expect cluster slot update to be successful, with just one healthy host, and failure counter to be updated. expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", primary_resolved_addresses); expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", - replica_resolved_addresses); + replica_resolved_addresses, + Network::DnsResolver::ResolutionStatus::Failure); + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); - expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + expectHealthyHosts(std::list({"127.0.1.1:22120"})); + EXPECT_EQ(1UL, cluster_->info()->stats().update_failure_.value()); + + // 2. Primary resolution fails, so replica resolution is not even called. + // Expect cluster slot update to be successful, with just one healthy host, and failure counter to be updated. + expectRedisResolve(true); + resolve_timer_->invokeCallback(); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses, + Network::DnsResolver::ResolutionStatus::Failure); + // NOTE: Intentionally commented out. Replica DNS resolution should even reach. It's here for illustrative purposes. + // expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + // replica_resolved_addresses); + expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); + // healthy hosts is same as before, but failure count increases by 1 + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts().size()); + EXPECT_EQ(2UL, cluster_->info()->stats().update_failure_.value()); } TEST_F(RedisClusterTest, EmptyDnsResponse) { From 063681e322bf1295f72ecb303bf6b288da0486a4 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Thu, 13 Jan 2022 00:11:41 -0800 Subject: [PATCH 07/21] run fix_format Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 6 ++++-- test/extensions/clusters/redis/redis_cluster_test.cc | 12 ++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 8b8f3aa0684da..75b9aace70d1c 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -1,11 +1,12 @@ #include "redis_cluster.h" +#include + #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h" #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h" #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h" -#include namespace Envoy { namespace Extensions { @@ -344,7 +345,8 @@ void RedisCluster::RedisDiscoverySession::resolveHostname(std::shared_ptrinfo()->stats().update_failure_.value()); // 2. Primary resolution fails, so replica resolution is not even called. - // Expect cluster slot update to be successful, with just one healthy host, and failure counter to be updated. + // Expect cluster slot update to be successful, with just one healthy host, and failure counter to + // be updated. expectRedisResolve(true); resolve_timer_->invokeCallback(); expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", primary_resolved_addresses, Network::DnsResolver::ResolutionStatus::Failure); - // NOTE: Intentionally commented out. Replica DNS resolution should even reach. It's here for illustrative purposes. + // NOTE: Intentionally commented out. Replica DNS resolution should even reach. It's here for + // illustrative purposes. + // expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", - // replica_resolved_addresses); + // replica_resolved_addresses); expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); // healthy hosts is same as before, but failure count increases by 1 EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); From 0f5b9d48ba13f19cb1b5421db5cfcfc8a59e17e1 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Thu, 13 Jan 2022 11:48:03 -0800 Subject: [PATCH 08/21] - fix syntax error introduced due to rebase; fetch address from DnsReponse using addrInfo() - ran code formatter, which changed some unrelated code, but it mostly looks fine. Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 8 ++++---- test/common/http/header_map_impl_test.cc | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 75b9aace70d1c..4d63fbdb15fdf 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -354,8 +354,8 @@ void RedisCluster::RedisDiscoverySession::resolveHostname(std::shared_ptr Date: Thu, 13 Jan 2022 12:56:41 -0800 Subject: [PATCH 09/21] revert the formatting change introduced by tool Signed-off-by: Sai Teja Duthuluri --- test/common/http/header_map_impl_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/common/http/header_map_impl_test.cc b/test/common/http/header_map_impl_test.cc index 7813ff5c1874e..6ecd8d391c415 100644 --- a/test/common/http/header_map_impl_test.cc +++ b/test/common/http/header_map_impl_test.cc @@ -424,7 +424,8 @@ TEST_P(HeaderMapImplTest, AllInlineHeaders) { } { // No request trailer O(1) headers. - } { + } + { auto header_map = ResponseHeaderMapImpl::create(); INLINE_RESP_STRING_HEADERS(TEST_INLINE_STRING_HEADER_FUNCS) INLINE_REQ_RESP_STRING_HEADERS(TEST_INLINE_STRING_HEADER_FUNCS) From 0ed133f6dd81bf168a57f3661e771571903ff463 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Thu, 13 Jan 2022 13:54:48 -0800 Subject: [PATCH 10/21] fix check_format failure Signed-off-by: Sai Teja Duthuluri --- test/common/http/header_map_impl_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/http/header_map_impl_test.cc b/test/common/http/header_map_impl_test.cc index 6ecd8d391c415..1b95f4b49ad14 100644 --- a/test/common/http/header_map_impl_test.cc +++ b/test/common/http/header_map_impl_test.cc @@ -423,7 +423,7 @@ TEST_P(HeaderMapImplTest, AllInlineHeaders) { INLINE_REQ_RESP_STRING_HEADERS(TEST_INLINE_STRING_HEADER_FUNCS) } { - // No request trailer O(1) headers. + // No request trailer O(1) headers. } { auto header_map = ResponseHeaderMapImpl::create(); From 455e6208739f0c47cd8115564ad6faee3efb9816 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Tue, 18 Jan 2022 21:10:47 -0800 Subject: [PATCH 11/21] - review comments - refactor `resolveReplicas` to resolve all the replicas in a loop, in parallel - add docstrings for `resolveReplicas` and `resolveHostname` Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 111 ++++++++++-------- .../extensions/clusters/redis/redis_cluster.h | 2 +- .../clusters/redis/redis_cluster_test.cc | 75 ++++++++++++ 3 files changed, 139 insertions(+), 49 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 4d63fbdb15fdf..41ac39a7e7032 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -224,19 +224,13 @@ RedisCluster::RedisDiscoverySession::RedisDiscoverySession( NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( parent_.info()->statsScope().symbolTable())) {} -// Convert the cluster slot IP/Port response to and address, return null if the response +// Convert the cluster slot IP/Port response to an address, return null if the response // does not match the expected type. Network::Address::InstanceConstSharedPtr -RedisCluster::RedisDiscoverySession::RedisDiscoverySession::processClusterByIP( - const NetworkFilters::Common::Redis::RespValue& value) { - - const auto& array = value.asArray(); - try { - return Network::Utility::parseInternetAddress(array[0].asString(), array[1].asInteger(), false); - } catch (const EnvoyException& ex) { - // Probably ElastiCache use case: hostname instead of IP - return nullptr; - } +RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ipAddressFromClusterEntry( + const std::vector& array) { + return Network::Utility::parseInternetAddressNoThrow(array[0].asString(), array[1].asInteger(), + false); } RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() { @@ -312,12 +306,10 @@ void RedisCluster::RedisDiscoverySession::startResolveRedis() { void RedisCluster::RedisDiscoverySession::updateDnsStats( Network::DnsResolver::ResolutionStatus status, bool empty_response) { - if (status == Network::DnsResolver::ResolutionStatus::Failure || empty_response) { - if (status == Network::DnsResolver::ResolutionStatus::Failure) { - parent_.info_->stats().update_failure_.inc(); - } else { - parent_.info_->stats().update_empty_.inc(); - } + if (status == Network::DnsResolver::ResolutionStatus::Failure) { + parent_.info_->stats().update_failure_.inc(); + } else if (empty_response) { + parent_.info_->stats().update_empty_.inc(); } } @@ -326,6 +318,14 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt resolveHostname(s, 0); } +/** + * Resolve the primary cluster entry hostname in the slot. If the primary is already resolved, + * proceed to resolve replicas. Additionally, if the primary is successfully resolved, we proceed to + * resolve replicas. + * + * @param slots the list of slots which may need DNS resolution + * @param index the specific index into `slots` whose primary cluster entry needs to be resolved + */ void RedisCluster::RedisDiscoverySession::resolveHostname(std::shared_ptr slots, std::size_t index) { if (index >= (**slots).size()) { @@ -364,19 +364,30 @@ void RedisCluster::RedisDiscoverySession::resolveHostname(std::shared_ptr slots, std::size_t index) { auto& slot = (**slots)[index]; - if (!slot.replicas_to_resolve_.empty()) { - const auto replica = slot.replicas_to_resolve_.back(); - slot.replicas_to_resolve_.pop_back(); + if (slot.replicas_to_resolve_.empty()) { + resolveHostname(std::move(slots), index + 1); + } + + std::size_t replica_idx = 0; + while (replica_idx < slot.replicas_to_resolve_.size()) { + auto replica = slot.replicas_to_resolve_[replica_idx]; ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", replica.first); parent_.dns_resolver_->resolve( replica.first, parent_.dns_lookup_family_, - [this, index, slots = std::move(slots), - replica](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { + [this, index, slots, replica_idx](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { auto& slot = (**slots)[index]; + auto& replica = slot.replicas_to_resolve_[replica_idx]; ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); updateDnsStats(status, response.empty()); // If DNS resolution fails here, we move on to resolve other replicas in the list. @@ -388,11 +399,13 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr>(); + // https://redis.io/commands/cluster-slots // CLUSTER SLOTS represents nested array of redis instances, like this: // - // 1) 1) (integer) 0 <-- start slot range - // 2) (integer) 5460 <-- end slot range + // 1) 1) (integer) 0 <-- start slot range + // 2) (integer) 5460 <-- end slot range // - // 3) 1) "127.0.0.1" | - // 2) (integer) 30001 <- master for slot as IP(HOST) / Port / - // ID 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" | + // 3) 1) "127.0.0.1" <-- primary slot IP ADDR(HOSTNAME) + // 2) (integer) 30001 <-- primary slot PORT + // 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366" // - // 4) 1) "127.0.0.1" | - // 2) (integer) 30004 <- replicas in the same format as master - // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" | + // 4) 1) "127.0.0.2" <-- replica slot IP ADDR(HOSTNAME) + // 2) (integer) 30004 <-- replica slot PORT + // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // // Loop through the cluster slot response and error checks for each field. - bool address_resolve_required = false; + bool address_resolution_required = false; for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) { if (part.type() != NetworkFilters::Common::Redis::RespType::Array) { onUnexpectedResponse(value); @@ -459,12 +473,13 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } // Try to parse primary slot address as IP address - // It may fail in AWS ElastiCache use case: it uses hostnames instead of IPs. - // If this is the case - we'll come back later and try to resolve hostnames asynchronously. + // It may fail in case the address is a hostname. If this is the case - we'll come back later + // and try to resolve hostnames asynchronously. For example, AWS ElastiCache returns hostname + // instead of IP address. ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(), - processClusterByIP(slot_range[SlotPrimary])); + ipAddressFromClusterEntry(slot_range[SlotPrimary].asArray())); if (slot.primary() == nullptr) { - // Primary address is hostname: save the name for further resolving + // Primary address is potentially a hostname, save it for async DNS resolution. const auto& array = slot_range[SlotPrimary].asArray(); slot.primary_hostname_ = array[0].asString(); slot.primary_port_ = array[1].asInteger(); @@ -477,29 +492,29 @@ void RedisCluster::RedisDiscoverySession::onResponse( onUnexpectedResponse(value); return; } - auto replica_address = processClusterByIP(*replica); + auto replica_address = ipAddressFromClusterEntry(replica->asArray()); if (replica_address) { slot.addReplica(std::move(replica_address)); } else { - // Possible AWS ElastiCache use case: hostname instead of IP + // Replica address is potentially a hostname, save it for async DNS resolution. const auto& array = replica->asArray(); slot.addReplicaToResolve(array[0].asString(), array[1].asInteger()); } } - // If at least one (primary, replicas) address is hostname, schedule DNS resolving + // If at least one (primary, replicas) address is hostname, schedule DNS resolution if (slot.primary() == nullptr || !slot.replicas_to_resolve_.empty()) { - address_resolve_required = true; + address_resolution_required = true; } cluster_slots->push_back(std::move(slot)); } - if (!address_resolve_required) { + if (address_resolution_required) { + // DNS resolution is required, defer finalizing the slot update until resolution is complete. + resolveClusterHostnames(std::move(cluster_slots)); + } else { // All slots addresses were represented by IP/Port pairs. parent_.onClusterSlotUpdate(std::move(cluster_slots)); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); - } else { - // Resolve hostnames, once completed run onClusterSlotUpdate() / enable timer - resolveClusterHostnames(std::move(cluster_slots)); } } diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 34ede7b169bed..0d59351492a26 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -242,7 +242,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&); Network::Address::InstanceConstSharedPtr - processClusterByIP(const NetworkFilters::Common::Redis::RespValue& value); + ipAddressFromClusterEntry(const std::vector& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); void resolveClusterHostnames(ClusterSlotsPtr&& slots); void resolveHostname(std::shared_ptr slots, std::size_t index); diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index ea653c04e3ce8..a1671f9c6ef8f 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -229,6 +229,50 @@ class RedisClusterTest : public testing::Test, return response; } + NetworkFilters::Common::Redis::RespValuePtr + singleSlotPrimaryWithTwoReplicas(const std::string& primary, const std::string& replica_1, + const std::string& replica_2, int64_t port) const { + std::vector primary_1(2); + primary_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + primary_1[0].asString() = primary; + primary_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + primary_1[1].asInteger() = port; + + std::vector repl_1(2); + repl_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + repl_1[0].asString() = replica_1; + repl_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + repl_1[1].asInteger() = port; + + std::vector repl_2(2); + repl_2[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + repl_2[0].asString() = replica_2; + repl_2[1].type(NetworkFilters::Common::Redis::RespType::Integer); + repl_2[1].asInteger() = port; + + std::vector slot_1(5); + slot_1[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[0].asInteger() = 0; + slot_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[1].asInteger() = 16383; + slot_1[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[2].asArray().swap(primary_1); + slot_1[3].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[3].asArray().swap(repl_1); + slot_1[4].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[4].asArray().swap(repl_2); + + std::vector slots(1); + slots[0].type(NetworkFilters::Common::Redis::RespType::Array); + slots[0].asArray().swap(slot_1); + + NetworkFilters::Common::Redis::RespValuePtr response( + new NetworkFilters::Common::Redis::RespValue()); + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots); + return response; + } + NetworkFilters::Common::Redis::RespValuePtr twoSlotsPrimaries() const { std::vector primary_1(2); primary_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); @@ -733,6 +777,37 @@ TEST_F(RedisClusterTest, AddressAsHostnameFailure) { EXPECT_EQ(2UL, cluster_->info()->stats().update_failure_.value()); } +TEST_F(RedisClusterTest, AddressAsHostnamePartialReplicaFailure) { + setupFromV3Yaml(BasicConfig); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + const std::list primary_resolved_addresses{"127.0.1.1"}; + + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + + // 1. Primary resolution is successful, and one of the replica is successful, but the other fails. + // Expect cluster slot update to be successful, with two healthy hosts, and expect failure counter + // to be updated. + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "failed-replica.org", + std::list{"127.0.1.2"}, + Network::DnsResolver::ResolutionStatus::Failure); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "success-replica.org", + std::list{"127.0.1.3"}); + + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryWithTwoReplicas("primary.com", "failed-replica.org", + "success-replica.org", 22120)); + EXPECT_EQ(2UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size()); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.3:22120"})); +} + TEST_F(RedisClusterTest, EmptyDnsResponse) { Event::MockTimer* dns_timer = new NiceMock(&dispatcher_); setupFromV3Yaml(BasicConfig); From 491d5448d72faffb3aaf8798f184e0b91bd6e80d Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Wed, 19 Jan 2022 00:34:32 -0800 Subject: [PATCH 12/21] fix bugprone-use-after-move error Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 41ac39a7e7032..104552259410d 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -376,6 +376,7 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr Date: Mon, 31 Jan 2022 14:41:38 -0800 Subject: [PATCH 13/21] Review comments round #2 - simplify cluster entry primary hostname resolution Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 96 +++++++++---------- .../extensions/clusters/redis/redis_cluster.h | 1 - 2 files changed, 47 insertions(+), 50 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 104552259410d..4332ae91e6b3a 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -313,54 +313,48 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( } } -void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) { - std::shared_ptr s = std::make_shared(std::move(slots)); - resolveHostname(s, 0); -} - /** - * Resolve the primary cluster entry hostname in the slot. If the primary is already resolved, - * proceed to resolve replicas. Additionally, if the primary is successfully resolved, we proceed to - * resolve replicas. + * Resolve the primary cluster entry hostname in each slot. + * If the primary is successfully resolved, we proceed to resolve replicas. * * @param slots the list of slots which may need DNS resolution - * @param index the specific index into `slots` whose primary cluster entry needs to be resolved */ -void RedisCluster::RedisDiscoverySession::resolveHostname(std::shared_ptr slots, - std::size_t index) { - if (index >= (**slots).size()) { - finishClusterHostnameResolution(std::move(slots)); - return; - } - auto& slot = (**slots)[index]; - if (slot.primary() == nullptr) { - ENVOY_LOG(trace, - "starting async DNS resolution for primary slot address {} at index location {}", - slot.primary_hostname_, index); - parent_.dns_resolver_->resolve( - slot.primary_hostname_, parent_.dns_lookup_family_, - [this, index, - slots = std::move(slots)](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { - auto& slot = (**slots)[index]; - ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); - updateDnsStats(status, response.empty()); - // If DNS resolution for a primary fails, we stop resolution for remaining, and reset the - // timer. - if (status != Network::DnsResolver::ResolutionStatus::Success) { - ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}", - slot.primary_hostname_); - resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); - return; - } - // Primary slot address resolved - slot.setPrimary(Network::Utility::getAddressWithPort( - *response.front().addrInfo().address_, slot.primary_port_)); - // Continue on to resolve replicas - resolveReplicas(std::move(slots), index); - }); - } else { - resolveReplicas(std::move(slots), index); +void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) { + std::shared_ptr slots_shared_ptr = + std::make_shared(std::move(slots)); + std::size_t slot_idx = 0; + while (slot_idx < (**slots_shared_ptr).size()) { + ENVOY_LOG(trace, "PRIMARY SLOT INDEX: {}", slot_idx); + auto& slot = (**slots_shared_ptr)[slot_idx]; + if (slot.primary() == nullptr) { + ENVOY_LOG(trace, + "starting async DNS resolution for primary slot address {} at index location {}", + slot.primary_hostname_, index); + parent_.dns_resolver_->resolve( + slot.primary_hostname_, parent_.dns_lookup_family_, + [this, slot_idx, slots_shared_ptr](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + auto& slot = (**slots_shared_ptr)[slot_idx]; + ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); + updateDnsStats(status, response.empty()); + // If DNS resolution for a primary fails, we stop resolution for remaining, and reset + // the timer. + if (status != Network::DnsResolver::ResolutionStatus::Success) { + ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}", + slot.primary_hostname_); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); + return; + } + // Primary slot address resolved + slot.setPrimary(Network::Utility::getAddressWithPort( + *response.front().addrInfo().address_, slot.primary_port_)); + // Continue on to resolve replicas + resolveReplicas(std::move(slots_shared_ptr), slot_idx); + }); + } else { + resolveReplicas(std::move(slots_shared_ptr), slot_idx); + } + slot_idx++; } } @@ -375,12 +369,15 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptrresolve( @@ -400,10 +397,11 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); void resolveClusterHostnames(ClusterSlotsPtr&& slots); - void resolveHostname(std::shared_ptr slots, std::size_t index); void resolveReplicas(std::shared_ptr slots, std::size_t index); void finishClusterHostnameResolution(std::shared_ptr slots); void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); From 59cd98fb8e464daa467fb75f5117b67549a65cd0 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Mon, 31 Jan 2022 22:04:17 -0800 Subject: [PATCH 14/21] remove envoy logs, and fix static check error Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 4332ae91e6b3a..154f796410f45 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -324,12 +324,11 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt std::make_shared(std::move(slots)); std::size_t slot_idx = 0; while (slot_idx < (**slots_shared_ptr).size()) { - ENVOY_LOG(trace, "PRIMARY SLOT INDEX: {}", slot_idx); auto& slot = (**slots_shared_ptr)[slot_idx]; if (slot.primary() == nullptr) { ENVOY_LOG(trace, "starting async DNS resolution for primary slot address {} at index location {}", - slot.primary_hostname_, index); + slot.primary_hostname_, slot_idx); parent_.dns_resolver_->resolve( slot.primary_hostname_, parent_.dns_lookup_family_, [this, slot_idx, slots_shared_ptr](Network::DnsResolver::ResolutionStatus status, @@ -352,7 +351,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt resolveReplicas(std::move(slots_shared_ptr), slot_idx); }); } else { - resolveReplicas(std::move(slots_shared_ptr), slot_idx); + resolveReplicas(slots_shared_ptr, slot_idx); } slot_idx++; } @@ -377,7 +376,6 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptrresolve( From f0de9a19596a00d5ed9a3d87bf4d97ae0c74f029 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Tue, 1 Feb 2022 00:30:58 -0800 Subject: [PATCH 15/21] update methods to accept ClusterSlotsSharedPtr Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 41 +++++++++---------- .../extensions/clusters/redis/redis_cluster.h | 8 ++-- .../clusters/redis/redis_cluster_lb.cc | 2 +- .../clusters/redis/redis_cluster_lb.h | 5 ++- test/extensions/clusters/redis/mocks.h | 2 +- 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 154f796410f45..3312783750cbb 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -92,7 +92,7 @@ void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added, hosts_added, hosts_removed, absl::nullopt); } -void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) { +void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots) { Upstream::HostVector new_hosts; absl::flat_hash_set all_new_hosts; @@ -319,21 +319,19 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( * * @param slots the list of slots which may need DNS resolution */ -void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPtr&& slots) { - std::shared_ptr slots_shared_ptr = - std::make_shared(std::move(slots)); +void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsSharedPtr&& slots) { std::size_t slot_idx = 0; - while (slot_idx < (**slots_shared_ptr).size()) { - auto& slot = (**slots_shared_ptr)[slot_idx]; + while (slot_idx < (*slots).size()) { + auto& slot = (*slots)[slot_idx]; if (slot.primary() == nullptr) { ENVOY_LOG(trace, "starting async DNS resolution for primary slot address {} at index location {}", slot.primary_hostname_, slot_idx); parent_.dns_resolver_->resolve( slot.primary_hostname_, parent_.dns_lookup_family_, - [this, slot_idx, slots_shared_ptr](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { - auto& slot = (**slots_shared_ptr)[slot_idx]; + [this, slot_idx, slots](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { + auto& slot = (*slots)[slot_idx]; ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); updateDnsStats(status, response.empty()); // If DNS resolution for a primary fails, we stop resolution for remaining, and reset @@ -348,10 +346,10 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt slot.setPrimary(Network::Utility::getAddressWithPort( *response.front().addrInfo().address_, slot.primary_port_)); // Continue on to resolve replicas - resolveReplicas(std::move(slots_shared_ptr), slot_idx); + resolveReplicas(slots, slot_idx); }); } else { - resolveReplicas(slots_shared_ptr, slot_idx); + resolveReplicas(slots, slot_idx); } slot_idx++; } @@ -364,12 +362,12 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsPt * @param slots the list of slots which may need DNS resolution * @param index the specific index into `slots` whose replicas need to be resolved */ -void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr slots, +void RedisCluster::RedisDiscoverySession::resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index) { - auto& slot = (**slots)[index]; + auto& slot = (*slots)[index]; if (slot.replicas_to_resolve_.empty()) { - if (index == (**slots).size() - 1) { - finishClusterHostnameResolution(std::move(slots)); + if (index == (*slots).size() - 1) { + finishClusterHostnameResolution(slots); } return; } @@ -382,7 +380,7 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr&& response) -> void { - auto& slot = (**slots)[index]; + auto& slot = (*slots)[index]; auto& replica = slot.replicas_to_resolve_[replica_idx]; ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); updateDnsStats(status, response.empty()); @@ -397,9 +395,8 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(std::shared_ptr slots) { - parent_.onClusterSlotUpdate(std::move(*slots)); + ClusterSlotsSharedPtr slots) { + parent_.onClusterSlotUpdate(std::move(slots)); resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); } @@ -427,7 +424,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( return; } - auto cluster_slots = std::make_unique>(); + auto cluster_slots = std::make_shared>(); // https://redis.io/commands/cluster-slots // CLUSTER SLOTS represents nested array of redis instances, like this: diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 44a5d67e96697..4faa20caf22a4 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -126,7 +126,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { void updateAllHosts(const Upstream::HostVector& hosts_added, const Upstream::HostVector& hosts_removed, uint32_t priority); - void onClusterSlotUpdate(ClusterSlotsPtr&&); + void onClusterSlotUpdate(ClusterSlotsSharedPtr&&); void reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) override; @@ -244,9 +244,9 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { Network::Address::InstanceConstSharedPtr ipAddressFromClusterEntry(const std::vector& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); - void resolveClusterHostnames(ClusterSlotsPtr&& slots); - void resolveReplicas(std::shared_ptr slots, std::size_t index); - void finishClusterHostnameResolution(std::shared_ptr slots); + void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots); + void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index); + void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots); void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); RedisCluster& parent_; diff --git a/source/extensions/clusters/redis/redis_cluster_lb.cc b/source/extensions/clusters/redis/redis_cluster_lb.cc index 13e5af645efcc..c44126e794f72 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.cc +++ b/source/extensions/clusters/redis/redis_cluster_lb.cc @@ -17,7 +17,7 @@ bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSl } // RedisClusterLoadBalancerFactory -bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsPtr&& slots, +bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Envoy::Upstream::HostMap& all_hosts) { // The slots is sorted, allowing for a quick comparison to make sure we need to update the slot // array sort based on start and end to enable efficient comparison diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index 756f75d68befc..1a311db0dea1e 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -123,7 +123,8 @@ class ClusterSlotUpdateCallBack { * @param all_hosts provides the updated hosts. * @return indicate if the cluster slot is updated or not. */ - virtual bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap& all_hosts) PURE; + virtual bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, + Upstream::HostMap& all_hosts) PURE; /** * Callback when a host's health status is updated @@ -143,7 +144,7 @@ class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack, RedisClusterLoadBalancerFactory(Random::RandomGenerator& random) : random_(random) {} // ClusterSlotUpdateCallBack - bool onClusterSlotUpdate(ClusterSlotsPtr&& slots, Upstream::HostMap& all_hosts) override; + bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Upstream::HostMap& all_hosts) override; void onHostHealthUpdate() override; diff --git a/test/extensions/clusters/redis/mocks.h b/test/extensions/clusters/redis/mocks.h index 1f1531f45be6c..123e09f181a07 100644 --- a/test/extensions/clusters/redis/mocks.h +++ b/test/extensions/clusters/redis/mocks.h @@ -21,7 +21,7 @@ class MockClusterSlotUpdateCallBack : public ClusterSlotUpdateCallBack { MockClusterSlotUpdateCallBack(); ~MockClusterSlotUpdateCallBack() override = default; - MOCK_METHOD(bool, onClusterSlotUpdate, (ClusterSlotsPtr&&, Upstream::HostMap&)); + MOCK_METHOD(bool, onClusterSlotUpdate, (ClusterSlotsSharedPtr&&, Upstream::HostMap&)); MOCK_METHOD(void, onHostHealthUpdate, ()); }; From 68b492fd634b4299391d388712a6b67b827b3b29 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Tue, 1 Feb 2022 02:34:11 -0800 Subject: [PATCH 16/21] write tests to increase coverage Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster_test.cc | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index a1671f9c6ef8f..f08a41b05eeab 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -193,6 +193,33 @@ class RedisClusterTest : public testing::Test, pool_callbacks_->onFailure(); } + NetworkFilters::Common::Redis::RespValuePtr singleSlotPrimary(const std::string& primary, + int64_t port) const { + std::vector primary_1(2); + primary_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + primary_1[0].asString() = primary; + primary_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + primary_1[1].asInteger() = port; + + std::vector slot_1(3); + slot_1[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[0].asInteger() = 0; + slot_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[1].asInteger() = 16383; + slot_1[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[2].asArray().swap(primary_1); + + std::vector slots(1); + slots[0].type(NetworkFilters::Common::Redis::RespType::Array); + slots[0].asArray().swap(slot_1); + + NetworkFilters::Common::Redis::RespValuePtr response( + new NetworkFilters::Common::Redis::RespValue()); + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots); + return response; + } + NetworkFilters::Common::Redis::RespValuePtr singleSlotPrimaryReplica(const std::string& primary, const std::string& replica, int64_t port) const { @@ -722,10 +749,33 @@ TEST_F(RedisClusterTest, AddressAsHostname) { EXPECT_CALL(initialized_, ready()); cluster_->initialize([&]() -> void { initialized_.ready(); }); + // 1. Single slot with primary and replica EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); expectClusterSlotResponse(singleSlotPrimaryReplica("primary.com", "replica.org", 22120)); expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); + + // 2. Single slot with just the primary hostname + expectRedisResolve(true); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->invokeCallback(); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "primary.com", + primary_resolved_addresses); + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimary("primary.com", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120"})); + EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); + + // 2. Single slot with just the primary IP address and replica hostname + expectRedisResolve(); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->invokeCallback(); + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "replica.org", + replica_resolved_addresses); + EXPECT_CALL(*cluster_callback_, onClusterSlotUpdate(_, _)); + expectClusterSlotResponse(singleSlotPrimaryReplica("127.0.1.1", "replica.org", 22120)); + expectHealthyHosts(std::list({"127.0.1.1:22120", "127.0.1.2:22120"})); + EXPECT_EQ(0U, cluster_->info()->stats().update_failure_.value()); } TEST_F(RedisClusterTest, AddressAsHostnameFailure) { From 75115c0ec59dd9ca10be05a8779f3670059c3575 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Wed, 2 Feb 2022 12:43:03 -0800 Subject: [PATCH 17/21] - review comments round 3 - use `slots->size()` instead of `(*slots).size()` - use counter to track number of hostnames to be resolved. Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 59 ++++++++++--------- .../extensions/clusters/redis/redis_cluster.h | 6 +- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 3312783750cbb..98e0f494b7c8d 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -1,5 +1,6 @@ #include "redis_cluster.h" +#include #include #include "envoy/config/cluster/v3/cluster.pb.h" @@ -316,12 +317,15 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( /** * Resolve the primary cluster entry hostname in each slot. * If the primary is successfully resolved, we proceed to resolve replicas. + * We use the count of hostnames that require resolution to decide when the resolution process is + * completed, and then call the post-resolution hooks. * * @param slots the list of slots which may need DNS resolution + * @param address_resolution_required_cnt the number of hostnames that need DNS resolution */ -void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsSharedPtr&& slots) { - std::size_t slot_idx = 0; - while (slot_idx < (*slots).size()) { +void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( + ClusterSlotsSharedPtr&& slots, uint64_t& hostname_resolution_required_cnt) { + for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) { auto& slot = (*slots)[slot_idx]; if (slot.primary() == nullptr) { ENVOY_LOG(trace, @@ -329,8 +333,9 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsSh slot.primary_hostname_, slot_idx); parent_.dns_resolver_->resolve( slot.primary_hostname_, parent_.dns_lookup_family_, - [this, slot_idx, slots](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { + [this, slot_idx, slots, + &hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { auto& slot = (*slots)[slot_idx]; ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); updateDnsStats(status, response.empty()); @@ -345,41 +350,42 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(ClusterSlotsSh // Primary slot address resolved slot.setPrimary(Network::Utility::getAddressWithPort( *response.front().addrInfo().address_, slot.primary_port_)); + hostname_resolution_required_cnt--; // Continue on to resolve replicas - resolveReplicas(slots, slot_idx); + resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt); }); } else { - resolveReplicas(slots, slot_idx); + resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt); } - slot_idx++; } } /** - * Resolve the replicas in a cluster entry. If there are no replicas, simply move to the next - * cluster entry. Once the last replica is resolved, we move to resolving the next cluster entry. + * Resolve the replicas in a cluster entry. If there are no replicas, simply return. + * If all the hostnames have been resolved, call post-resolution methods. * * @param slots the list of slots which may need DNS resolution * @param index the specific index into `slots` whose replicas need to be resolved + * @param address_resolution_required_cnt the number of address that need to be resolved */ -void RedisCluster::RedisDiscoverySession::resolveReplicas(ClusterSlotsSharedPtr slots, - std::size_t index) { +void RedisCluster::RedisDiscoverySession::resolveReplicas( + ClusterSlotsSharedPtr slots, std::size_t index, uint64_t& hostname_resolution_required_cnt) { auto& slot = (*slots)[index]; if (slot.replicas_to_resolve_.empty()) { - if (index == (*slots).size() - 1) { + if (hostname_resolution_required_cnt == 0) { finishClusterHostnameResolution(slots); } return; } - std::size_t replica_idx = 0; - while (replica_idx < slot.replicas_to_resolve_.size()) { + for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) { auto replica = slot.replicas_to_resolve_[replica_idx]; ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", replica.first); parent_.dns_resolver_->resolve( replica.first, parent_.dns_lookup_family_, - [this, index, slots, replica_idx](Network::DnsResolver::ResolutionStatus status, - std::list&& response) -> void { + [this, index, slots, replica_idx, + &hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status, + std::list&& response) -> void { auto& slot = (*slots)[index]; auto& replica = slot.replicas_to_resolve_[replica_idx]; ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); @@ -393,13 +399,12 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas(ClusterSlotsSharedPtr slot.addReplica(Network::Utility::getAddressWithPort( *response.front().addrInfo().address_, replica.second)); } - // finish resolution if it's the last primary and last replica of that primary being - // resolved - if (index == (*slots).size() - 1 && replica_idx == slot.replicas_to_resolve_.size() - 1) { + hostname_resolution_required_cnt--; + // finish resolution if all the addresses have been resolved. + if (hostname_resolution_required_cnt <= 0) { finishClusterHostnameResolution(slots); } }); - replica_idx++; } } @@ -441,7 +446,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // // Loop through the cluster slot response and error checks for each field. - bool address_resolution_required = false; + uint64_t hostname_resolution_required_cnt = 0; for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) { if (part.type() != NetworkFilters::Common::Redis::RespType::Array) { onUnexpectedResponse(value); @@ -477,6 +482,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( const auto& array = slot_range[SlotPrimary].asArray(); slot.primary_hostname_ = array[0].asString(); slot.primary_port_ = array[1].asInteger(); + hostname_resolution_required_cnt++; } // Row 4-N: Replica(s) addresses @@ -493,18 +499,15 @@ void RedisCluster::RedisDiscoverySession::onResponse( // Replica address is potentially a hostname, save it for async DNS resolution. const auto& array = replica->asArray(); slot.addReplicaToResolve(array[0].asString(), array[1].asInteger()); + hostname_resolution_required_cnt++; } } - // If at least one (primary, replicas) address is hostname, schedule DNS resolution - if (slot.primary() == nullptr || !slot.replicas_to_resolve_.empty()) { - address_resolution_required = true; - } cluster_slots->push_back(std::move(slot)); } - if (address_resolution_required) { + if (hostname_resolution_required_cnt > 0) { // DNS resolution is required, defer finalizing the slot update until resolution is complete. - resolveClusterHostnames(std::move(cluster_slots)); + resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt); } else { // All slots addresses were represented by IP/Port pairs. parent_.onClusterSlotUpdate(std::move(cluster_slots)); diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 4faa20caf22a4..1361cb52b46ad 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -244,8 +244,10 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { Network::Address::InstanceConstSharedPtr ipAddressFromClusterEntry(const std::vector& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); - void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots); - void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index); + void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots, + uint64_t& hostname_resolution_required_cnt); + void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index, + uint64_t& hostname_resolution_required_cnt); void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots); void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); From 9bab9bd750c0609d2dfe1cb7b057cc408290a16d Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Fri, 18 Feb 2022 10:14:55 -0800 Subject: [PATCH 18/21] add integration test Signed-off-by: Sai Teja Duthuluri --- .../redis/redis_cluster_integration_test.cc | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/test/extensions/clusters/redis/redis_cluster_integration_test.cc b/test/extensions/clusters/redis/redis_cluster_integration_test.cc index 7a0128d09f00d..0452b85dd3ed7 100644 --- a/test/extensions/clusters/redis/redis_cluster_integration_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_integration_test.cc @@ -1,3 +1,4 @@ +#include #include #include @@ -319,6 +320,22 @@ class RedisClusterIntegrationTest : public testing::TestWithParamlocalAddress()->ip()->port(), "localhost", + fake_upstreams_[1]->localAddress()->ip()->port()); + + expectCallClusterSlot(random_index_, cluster_slot_response); + }; + + initialize(); + + // foo hashes to slot 12182 which is in upstream 0 + simpleRequestAndResponse(0, makeBulkStringArray({"get", "foo"}), "$3\r\nbar\r\n"); +} + +// This test sends a simple "get foo" command from a fake // This test sends a simple "get foo" command from a fake // downstream client through the proxy to a fake upstream // Redis cluster with 2 slots. The fake server sends a valid response From 9aa8772f99d24cc6e85c24c8fe0f202b2d878bb5 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Fri, 18 Feb 2022 14:51:26 -0800 Subject: [PATCH 19/21] change the dns lookup family Signed-off-by: Sai Teja Duthuluri --- source/extensions/clusters/redis/redis_cluster.cc | 8 ++++---- .../clusters/redis/redis_cluster_integration_test.cc | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 98e0f494b7c8d..1273243758490 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -328,7 +328,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) { auto& slot = (*slots)[slot_idx]; if (slot.primary() == nullptr) { - ENVOY_LOG(trace, + ENVOY_LOG(debug, "starting async DNS resolution for primary slot address {} at index location {}", slot.primary_hostname_, slot_idx); parent_.dns_resolver_->resolve( @@ -337,7 +337,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( &hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status, std::list&& response) -> void { auto& slot = (*slots)[slot_idx]; - ENVOY_LOG(trace, "async DNS resolution complete for {}", slot.primary_hostname_); + ENVOY_LOG(debug, "async DNS resolution complete for {}", slot.primary_hostname_); updateDnsStats(status, response.empty()); // If DNS resolution for a primary fails, we stop resolution for remaining, and reset // the timer. @@ -380,7 +380,7 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas( for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) { auto replica = slot.replicas_to_resolve_[replica_idx]; - ENVOY_LOG(trace, "starting async DNS resolution for replica address {}", replica.first); + ENVOY_LOG(debug, "starting async DNS resolution for replica address {}", replica.first); parent_.dns_resolver_->resolve( replica.first, parent_.dns_lookup_family_, [this, index, slots, replica_idx, @@ -388,7 +388,7 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas( std::list&& response) -> void { auto& slot = (*slots)[index]; auto& replica = slot.replicas_to_resolve_[replica_idx]; - ENVOY_LOG(trace, "async DNS resolution complete for {}", replica.first); + ENVOY_LOG(debug, "async DNS resolution complete for {}", replica.first); updateDnsStats(status, response.empty()); // If DNS resolution fails here, we move on to resolve other replicas in the list. // We log a warn message. diff --git a/test/extensions/clusters/redis/redis_cluster_integration_test.cc b/test/extensions/clusters/redis/redis_cluster_integration_test.cc index 0452b85dd3ed7..b38e9d3c9d63c 100644 --- a/test/extensions/clusters/redis/redis_cluster_integration_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_integration_test.cc @@ -158,6 +158,9 @@ class RedisClusterIntegrationTest : public testing::TestWithParammutable_clusters(0); + if (version_ == Network::Address::IpVersion::v4) { + cluster_0->set_dns_lookup_family(envoy::config::cluster::v3::Cluster::V4_ONLY); + } for (int j = 0; j < cluster_0->load_assignment().endpoints_size(); ++j) { auto locality_lb = cluster_0->mutable_load_assignment()->mutable_endpoints(j); for (int k = 0; k < locality_lb->lb_endpoints_size(); ++k) { From 5e2f4f1f95a406fcd851c357cb308bc48545d13a Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Fri, 18 Feb 2022 17:00:31 -0800 Subject: [PATCH 20/21] make the integer a shared pointer Signed-off-by: Sai Teja Duthuluri --- .../clusters/redis/redis_cluster.cc | 22 ++++++++++--------- .../extensions/clusters/redis/redis_cluster.h | 4 ++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 1273243758490..b0de4f897aef5 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -324,7 +324,8 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( * @param address_resolution_required_cnt the number of hostnames that need DNS resolution */ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( - ClusterSlotsSharedPtr&& slots, uint64_t& hostname_resolution_required_cnt) { + ClusterSlotsSharedPtr&& slots, + std::shared_ptr hostname_resolution_required_cnt) { for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) { auto& slot = (*slots)[slot_idx]; if (slot.primary() == nullptr) { @@ -350,7 +351,7 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( // Primary slot address resolved slot.setPrimary(Network::Utility::getAddressWithPort( *response.front().addrInfo().address_, slot.primary_port_)); - hostname_resolution_required_cnt--; + (*hostname_resolution_required_cnt)--; // Continue on to resolve replicas resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt); }); @@ -369,10 +370,11 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( * @param address_resolution_required_cnt the number of address that need to be resolved */ void RedisCluster::RedisDiscoverySession::resolveReplicas( - ClusterSlotsSharedPtr slots, std::size_t index, uint64_t& hostname_resolution_required_cnt) { + ClusterSlotsSharedPtr slots, std::size_t index, + std::shared_ptr hostname_resolution_required_cnt) { auto& slot = (*slots)[index]; if (slot.replicas_to_resolve_.empty()) { - if (hostname_resolution_required_cnt == 0) { + if (*hostname_resolution_required_cnt == 0) { finishClusterHostnameResolution(slots); } return; @@ -399,9 +401,9 @@ void RedisCluster::RedisDiscoverySession::resolveReplicas( slot.addReplica(Network::Utility::getAddressWithPort( *response.front().addrInfo().address_, replica.second)); } - hostname_resolution_required_cnt--; + (*hostname_resolution_required_cnt)--; // finish resolution if all the addresses have been resolved. - if (hostname_resolution_required_cnt <= 0) { + if (*hostname_resolution_required_cnt <= 0) { finishClusterHostnameResolution(slots); } }); @@ -446,7 +448,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf" // // Loop through the cluster slot response and error checks for each field. - uint64_t hostname_resolution_required_cnt = 0; + auto hostname_resolution_required_cnt = std::make_shared(0); for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) { if (part.type() != NetworkFilters::Common::Redis::RespType::Array) { onUnexpectedResponse(value); @@ -482,7 +484,7 @@ void RedisCluster::RedisDiscoverySession::onResponse( const auto& array = slot_range[SlotPrimary].asArray(); slot.primary_hostname_ = array[0].asString(); slot.primary_port_ = array[1].asInteger(); - hostname_resolution_required_cnt++; + (*hostname_resolution_required_cnt)++; } // Row 4-N: Replica(s) addresses @@ -499,13 +501,13 @@ void RedisCluster::RedisDiscoverySession::onResponse( // Replica address is potentially a hostname, save it for async DNS resolution. const auto& array = replica->asArray(); slot.addReplicaToResolve(array[0].asString(), array[1].asInteger()); - hostname_resolution_required_cnt++; + (*hostname_resolution_required_cnt)++; } } cluster_slots->push_back(std::move(slot)); } - if (hostname_resolution_required_cnt > 0) { + if (*hostname_resolution_required_cnt > 0) { // DNS resolution is required, defer finalizing the slot update until resolution is complete. resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt); } else { diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 1361cb52b46ad..74d36679157a0 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -245,9 +245,9 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { ipAddressFromClusterEntry(const std::vector& value); bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots, - uint64_t& hostname_resolution_required_cnt); + std::shared_ptr hostname_resolution_required_cnt); void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index, - uint64_t& hostname_resolution_required_cnt); + std::shared_ptr hostname_resolution_required_cnt); void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots); void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); From 70877836e712d5cbbaef2fc378637ffe6c91ad18 Mon Sep 17 00:00:00 2001 From: Sai Teja Duthuluri Date: Fri, 25 Feb 2022 11:30:36 -0800 Subject: [PATCH 21/21] update documentation Signed-off-by: Sai Teja Duthuluri --- docs/root/intro/arch_overview/other_protocols/redis.rst | 2 ++ docs/root/version_history/current.rst | 1 + source/extensions/clusters/redis/redis_cluster.cc | 7 +++++++ 3 files changed, 10 insertions(+) diff --git a/docs/root/intro/arch_overview/other_protocols/redis.rst b/docs/root/intro/arch_overview/other_protocols/redis.rst index 0349a0d8b888c..944879300450d 100644 --- a/docs/root/intro/arch_overview/other_protocols/redis.rst +++ b/docs/root/intro/arch_overview/other_protocols/redis.rst @@ -81,6 +81,8 @@ following information: * The primaries for each shard. * Nodes entering or leaving the cluster. +Envoy proxy supports identification of the nodes via both IP address and hostnames in the `cluster slots` command response. In case of failure to resolve a primary hostname, Envoy will retry resolution of all nodes periodically until success. Failure to resolve a replica simply skips that replica. + For topology configuration details, see the Redis Cluster :ref:`v3 API reference `. diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index b339fc8038734..b2b8402980d63 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -33,6 +33,7 @@ Removed Config or Runtime New Features ------------ * http3: downstream HTTP/3 support is now GA! Upstream HTTP/3 also GA for specific deployments. See :ref:`here ` for details. +* redis: support for hostnames returned in `cluster slots` response is now available. Deprecated diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index b0de4f897aef5..71ce6e69cd71b 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -320,6 +320,11 @@ void RedisCluster::RedisDiscoverySession::updateDnsStats( * We use the count of hostnames that require resolution to decide when the resolution process is * completed, and then call the post-resolution hooks. * + * If resolving any one of the primary replicas fails, we stop the resolution process and reset + * the timers to retry the resolution. Failure to resolve a replica, on the other hand does not + * stop the process. If we replica resolution fails, we simply log a warning, and move to resolving + * the rest. + * * @param slots the list of slots which may need DNS resolution * @param address_resolution_required_cnt the number of hostnames that need DNS resolution */ @@ -364,6 +369,8 @@ void RedisCluster::RedisDiscoverySession::resolveClusterHostnames( /** * Resolve the replicas in a cluster entry. If there are no replicas, simply return. * If all the hostnames have been resolved, call post-resolution methods. + * Failure to resolve a replica does not stop the overall resolution process. We log a + * warning, and move to the next one. * * @param slots the list of slots which may need DNS resolution * @param index the specific index into `slots` whose replicas need to be resolved