Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 73 additions & 14 deletions source/common/upstream/ring_hash_lb.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "common/upstream/ring_hash_lb.h"
#include "source/common/upstream/ring_hash_lb.h"

#include <cstdint>
#include <iostream>
Expand All @@ -7,8 +7,8 @@

#include "envoy/config/cluster/v3/cluster.pb.h"

#include "common/common/assert.h"
#include "common/upstream/load_balancer_impl.h"
#include "source/common/common/assert.h"
#include "source/common/upstream/load_balancer_impl.h"

#include "absl/container/inlined_vector.h"
#include "absl/strings/string_view.h"
Expand Down Expand Up @@ -58,8 +58,25 @@ HostConstSharedPtr RingHashLoadBalancer::Ring::chooseHost(uint64_t h, uint32_t a
// I've generally kept the variable names to make the code easier to compare.
// NOTE: The algorithm depends on using signed integers for lowp, midp, and highp. Do not
// change them!
int64_t lowp = 0;
int64_t highp = ring_.size();
int64_t lowp, highp;

// Algorithm is to shard the indices and lookup the host for a given hash from a bucket
// (instead of a lookup of all hosts). Hence fewer lookups/ accesses and faster execution.
#ifdef BUCKET_ALGORITHM

// Given a hash 'h', find the bucket index by shifting it to right (by rightShift).
uint64_t bucket_index = h >> rightShift;
// 'lowp' and 'highp' are the lower and upper indices of the bucket.
lowp = ring_bucket_[bucket_index];
highp = ring_bucket_[bucket_index + 1] - 1;

#else

lowp = 0;
highp = ring_.size();

#endif

int64_t midp = 0;
while (true) {
midp = (lowp + highp) / 2;
Expand Down Expand Up @@ -149,11 +166,10 @@ RingHashLoadBalancer::Ring::Ring(const NormalizedHostWeightVector& normalized_ho
uint64_t max_hashes_per_host = 0;
for (const auto& entry : normalized_host_weights) {
const auto& host = entry.first;
const std::string& address_string =
use_hostname_for_hashing ? host->hostname() : host->address()->asString();
ASSERT(!address_string.empty());
const absl::string_view key_to_hash = hashKey(host, use_hostname_for_hashing);
ASSERT(!key_to_hash.empty());

hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.assign(key_to_hash.begin(), key_to_hash.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();

Expand All @@ -173,7 +189,7 @@ RingHashLoadBalancer::Ring::Ring(const NormalizedHostWeightVector& normalized_ho
? MurmurHash::murmurHash2(hash_key, MurmurHash::STD_HASH_SEED)
: HashUtil::xxHash64(hash_key);

ENVOY_LOG(trace, "ring hash: hash_key={} hash={}", hash_key.data(), hash);
ENVOY_LOG(trace, "ring hash: hash_key={} hash={}", hash_key, hash);
ring_.push_back({hash, host});
++i;
++current_hashes;
Expand All @@ -188,12 +204,55 @@ RingHashLoadBalancer::Ring::Ring(const NormalizedHostWeightVector& normalized_ho
});
if (ENVOY_LOG_CHECK_LEVEL(trace)) {
for (const auto& entry : ring_) {
ENVOY_LOG(trace, "ring hash: host={} hash={}",
use_hostname_for_hashing ? entry.host_->hostname()
: entry.host_->address()->asString(),
entry.hash_);
const absl::string_view key_to_hash = hashKey(entry.host_, use_hostname_for_hashing);
ENVOY_LOG(trace, "ring hash: host={} hash={}", key_to_hash, entry.hash_);
}
}

#ifdef BUCKET_ALGORITHM
/******* BEGIN: code for bucketing algorithm *******/

// Find MSB bit of the first hash so we can right shift all the other hashes to create buckets.
int msb = 0;
uint64_t n = ring_[0].hash_;
n = n / 2;
while (n != 0) {
n = n / 2;
msb++;
}
// Arbitrarily choosing MSB + 10 bits to shift to right for creating bucket. The larger the shift
// to right, the fewer the buckets. One can experiments with + 10, 12, 14, ... 20.
msb += 10;
// Save it for later use (when doing lookup of hosts for a given hash).
rightShift = msb;

// Reserve memory for bucket indices. Worst-case, every hash belongs to a different bucket!
// The ring_bucket_ container stores the start indices of the buckets.
ring_bucket_.reserve(ring_size);

// Right shift each hash and create buckets of the hosts.
uint64_t ring_index = 0;
uint64_t curr_bucket, prev_bucket = 0;

// push the first index if the ring_ isn't empty
if(!ring_.empty())
ring_bucket_.push_back(ring_index);

for (const auto& entry : ring_) {
curr_bucket = entry.hash_ >> msb;
// If new bucket found, push the index to ring_bucket_ and update curr_bucket.
if(curr_bucket != prev_bucket)
{
prev_bucket = curr_bucket;
ring_bucket_.push_back(ring_index);
}
ring_index++;
}
// For the last bucket, we need end and hence storing ring_size value.
ring_bucket_.push_back(ring_size);

/******* END: code for bucketing algorithm *******/
#endif

stats_.size_.set(ring_size);
stats_.min_hashes_per_host_.set(min_hashes_per_host);
Expand Down
11 changes: 8 additions & 3 deletions source/common/upstream/ring_hash_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "common/common/logger.h"
#include "common/upstream/thread_aware_lb_impl.h"
#include "source/common/common/logger.h"
#include "source/common/upstream/thread_aware_lb_impl.h"

namespace Envoy {
namespace Upstream {
Expand Down Expand Up @@ -66,6 +66,11 @@ class RingHashLoadBalancer : public ThreadAwareLoadBalancerBase,

std::vector<RingEntry> ring_;

#ifdef BUCKET_ALGORITHM
std::vector<uint64_t> ring_bucket_;
uint64_t rightShift;
#endif

RingHashLoadBalancerStats& stats_;
};
using RingConstSharedPtr = std::shared_ptr<const Ring>;
Expand All @@ -87,7 +92,7 @@ class RingHashLoadBalancer : public ThreadAwareLoadBalancerBase,

static RingHashLoadBalancerStats generateStats(Stats::Scope& scope);

Stats::ScopePtr scope_;
Stats::ScopeSharedPtr scope_;
RingHashLoadBalancerStats stats_;

static const uint64_t DefaultMinRingSize = 1024;
Expand Down